#### Concepts you will learn in this notebook:
- Use *Google Scholar* data set in CSV file format with fields research_interest, author_name, email.
- Read *research_interest* field from a csv file in *Google Drive* into RDD and then use *flatMap* and *reduceByKey* to count occurance for each of the research_interest with a map function. *flatMap* helps to apply a transformation on a RDD/Dataframe and convert into another RDD/Dataframe. *reduceByKey* helps to merge the values of keys (words) by applying an reducing operator (add) on it. In our example, we apply add reducing operator on word occurance count on each document/row of a dataframe.
- Using Aggregate function (mean) and sort by column of a dataframe.
- Create a new column and fill it with a *User Defined Function (UDF)* applied on a field in spark dataframe.
- Convert a RDD into a Dataframe with or without schema.
- Apply filter on Spark dataframe.
- Write spark dataframe as a single CSV to a Google Drive Folder.
- Spark *Join* concept applied on dataframes.

###### START of PRE-REQUISITE

##### Use apt-get to install basic libraries needed to enable pyspark

In [None]:
!apt-get update 

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [2 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:4 https://developer.download.nvidia.com/comp

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
is_distributed = False
!pip install -q findspark
!pip install pytorch_lightning

# !pip install elephas
# !pip install analytics-zoo
# !pip install bigdl

Collecting pytorch_lightning
  Downloading pytorch_lightning-1.5.8-py3-none-any.whl (526 kB)
[?25l[K     |▋                               | 10 kB 24.5 MB/s eta 0:00:01[K     |█▎                              | 20 kB 30.9 MB/s eta 0:00:01[K     |█▉                              | 30 kB 15.2 MB/s eta 0:00:01[K     |██▌                             | 40 kB 10.6 MB/s eta 0:00:01[K     |███▏                            | 51 kB 7.0 MB/s eta 0:00:01[K     |███▊                            | 61 kB 7.7 MB/s eta 0:00:01[K     |████▍                           | 71 kB 7.3 MB/s eta 0:00:01[K     |█████                           | 81 kB 8.2 MB/s eta 0:00:01[K     |█████▋                          | 92 kB 6.3 MB/s eta 0:00:01[K     |██████▎                         | 102 kB 6.8 MB/s eta 0:00:01[K     |██████▉                         | 112 kB 6.8 MB/s eta 0:00:01[K     |███████▌                        | 122 kB 6.8 MB/s eta 0:00:01[K     |████████                        | 133 kB 6.8

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

##### Mount Google Drive

In [None]:
# mount google drive to the linux running this Colab application
from google.colab import drive
drive.mount('/content/drive')
# list few directories (check access)
! ls -ltr /content/drive/MyDrive/dataset/
! ls -ltr /content/drive/MyDrive/data_processing/
!ls -ltr /content/drive/MyDrive/data_processing/results/top_10_corona_dist/
!ls -ltr /content/drive/MyDrive/ |head -2

Mounted at /content/drive
total 17
drwx------ 2 root root 4096 Jan  4 21:56  dataset_json
drwx------ 2 root root 4096 Jan  4 21:56  dataset_html
drwx------ 2 root root 4096 Jan  4 21:56  dataset_csv
drwx------ 2 root root 4096 Jan  4 21:59  dataset_xml
-rw------- 1 root root  151 Jan 11 02:09 'readME -> dataset.gdoc'
total 8
drwx------ 2 root root 4096 Jan  4 21:55 results
drwx------ 2 root root 4096 Jan  4 21:55 intermediate
total 1
-rw------- 1 root root 255 Jan 13 19:16 part-00000-de3a3f7b-bff7-49f8-a6ff-8f4cfa8e6fad-c000.csv
-rw------- 1 root root   0 Jan 13 19:16 _SUCCESS
total 3854358
-rw------- 1 root root   6116583 May 20  2011 Addresses.mp3


##### Variable Declarations

In [None]:
# data sets
path_covid = "/content/drive/MyDrive/dataset/dataset_csv/dataset-covid/cdc-pfizer-covid-19-vaccine-distribution-by-state.csv"
path_power = "/content/drive/MyDrive/dataset/dataset_csv/dataset-powerlifting-csv/openpowerlifting-2021-12-10-b420db66.csv"
path_titanic_train = "/content/drive/MyDrive/dataset/dataset_csv/dataset-titanic/train.txt"
path_google_scholar = "/content/drive/MyDrive/dataset/dataset_csv/dataset-google-scholar/output.csv"
# base output path
path_out_base_result = "/content/drive/MyDrive/data_processing/results/"

In [None]:
# output file for top 10 corona distribution
path_out_avg = path_out_base_result + "top_10_corona_dist"

In [None]:
# IMPORT important Libraries
import pyspark.sql.functions as F
import pandas as pd
from tabulate import tabulate
import traceback
import numpy as np
import matplotlib.pyplot as plt
from requests import get
import requests
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import matplotlib
import pytorch_lightning
from __future__ import print_function
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def pretty_print_pandas(title, df, n):
  """ Pretty print
  """
  print(f"{title}:")
  print(tabulate(df.head(n), headers="keys", tablefmt="psql" ))

##### Sample read few data sets available

In [None]:
!ls -ltr /content/drive/MyDrive/dataset/dataset_csv/dataset-powerlifting-csv/openpowerlifting-2021-12-10-b420db66.csv
!echo "--------------> tit"
!ls -ltr /content/drive/MyDrive/dataset/dataset_csv/
!echo "--------------> tit_train"
!head -2 /content/drive/MyDrive/dataset/dataset_csv/dataset-titanic/train.csv
!echo "--------------> myDrive"
!ls -ltr /content/drive/MyDrive/ | head -5
!head -2 /content/drive/MyDrive/dataset/dataset_csv/dataset-google-scholar/output.csv

-rw------- 1 root root 502549264 Dec 10 06:09 /content/drive/MyDrive/dataset/dataset_csv/dataset-powerlifting-csv/openpowerlifting-2021-12-10-b420db66.csv
--------------> tit
total 20
drwx------ 2 root root 4096 Dec 10 06:56 dataset-powerlifting-csv
drwx------ 2 root root 4096 Dec 11 07:34 dataset-covid
drwx------ 2 root root 4096 Jan  3 19:48 dataset-titanic
drwx------ 2 root root 4096 Jan  4 06:09 dataset-google-scholar
drwx------ 2 root root 4096 Jan 11 02:13 dataset-covid-2
--------------> tit_train
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
--------------> myDrive
total 3854358
-rw------- 1 root root   6116583 May 20  2011 Addresses.mp3
-rw------- 1 root root   5384507 May 20  2011 All in the Family.mp3
-rw------- 1 root root   4926026 May 20  2011 Allergies.mp3
-rw------- 1 root root   7512985 May 20  2011 Asking for Favors.mp3
author_name,email,affiliation,coauthors_names,research_

READ input files

In [None]:
# read covid vaccine weekly distribution 
df_in_covid = spark \
              .read \
              .option("header", True) \
              .csv(path_covid)

# read google scholar
df_gs = spark \
        .read \
        .option("header", True) \
        .csv(path_google_scholar)


###### END OF PRE-PREQUISITE

### Google Scholar -> calculate frequency of research interest

In [None]:
# CALCULATE FREQUENCY FOR EACH WORD IN RESEARCH INTEREST
%%time
from pyspark import SparkContext
from operator import add
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType, IntegerType
from pyspark.sql import SQLContext

# read into dataframe (df)
df_gs = spark.read.option("header", True).csv(path_google_scholar)
# research_interest can't be None
df_gs_clean = df_gs.filter("research_interest != 'None'")
# referring Column Names
rdd_ri = df_gs_clean.rdd.map(lambda x: (x["research_interest"]))
print("\nSample RDD rows:")
print(rdd_ri.take(5))
print("\nSample RDD rows after frequenc count for each words:")
# flatMap() helps to apply transformation
rdd_ri_freq = rdd_ri.flatMap(lambda x: [(w.lower(), 1) for w in x.split('##')]).reduceByKey(add)
# rdd print with take() function
print(rdd_ri_freq.take(5))

# approach 1 : convert to df without any schema (no proper col names)
df_ri_freq = rdd_ri_freq.toDF() 

pretty_print_pandas("RI freq without schema", df_ri_freq, 10)

# approach 2 : convert to df with schema
schema = StructType([StructField("ri", StringType(), False), 
                     StructField("frequency", IntegerType(), False)
])
# convert rdd to df with schema
df = spark.createDataFrame(rdd_ri_freq, schema)
print("\nProposed Schema of DF:")
# print schema (to verify)
df.printSchema()
print("\nRDD converted to DF with schema:")
# sort
df_sort = df.sort(F.col("frequency").desc())
df_sort.show(10, truncate=False)



Sample RDD rows:
['data_mining##anomaly_detection', 'artificial_intelligence##machine_learning##data_mining##graph_mining##security', 'machine_learning##never_ending_learning##lifelong_machine_learning##medical_informatics', 'graph_mining##big_data_analytics##machine_learning', 'network_security##cyber_physical_systems_security##cyber_education_and_workforce_development']

Sample RDD rows after frequenc count for each words:
[('data_mining', 63), ('anomaly_detection', 5), ('artificial_intelligence', 123), ('machine_learning', 198), ('graph_mining', 5)]
RI freq without schema:
+---------------------------+-----+
| 0                         |   1 |
|---------------------------+-----|
| data_mining               |  63 |
| anomaly_detection         |   5 |
| artificial_intelligence   | 123 |
| machine_learning          | 198 |
| graph_mining              |   5 |
| security                  |  25 |
| never_ending_learning     |   1 |
| lifelong_machine_learning |   1 |
| medical_informatic

In [None]:
# This example takes all the columns in the given google scholar file and process the rdd

# rdd
rdd = spark.sparkContext.textFile(path_google_scholar)
print(type(rdd))
counts = rdd.flatMap(lambda x: [(w.lower(), 1) for w in x.split(',')]).reduceByKey(add)
print(counts.take(5))

<class 'pyspark.rdd.RDD'>
[('author_name', 1), ('email', 1), ('affiliation', 1), ('coauthors_names', 1), ('research_interest', 1)]


### UDF to create a new field "is_artificial_intellence" of boolean type

In [None]:
from pyspark.sql.types import StringType, IntegerType
import traceback

lst_ai  = ["data_science", "artificial_intelligence",
           "machine_learning"]

@F.udf
def is_ai(research):
    """ return 1 if research in AI domain else 0
    """
    try:
      # split the research interest string with delimiter "##"  
      lst_research = [w.lower() for w in str(research).split("##")]

      for res in lst_research:
        # if present in AI domain
        if res in lst_ai:
          return 1
      # not present in AI domain
      return 0
    except:
      return -1
 
# df read 
df_gs = spark.read.option("header", True).csv(path_google_scholar)
# create a new column "is_artificial_intelligence"
df_gs_new = df_gs.withColumn("is_artificial_intelligence",\
                             is_ai(F.col("research_interest")))
# df_gs_new.printSchema()
df_gs.show(5, truncate=False)
df_gs_new.show(n=20)
print(f"Verify that is_ai should have only two distinct value: 0 & 1")
df_gs_new.select("is_ai").distinct().show(5)
# show selective columns for analysis
df_gs_new[df_gs_new["author_name"].isin(["Christa Cody", "Gabriel Weimann", ""])]\
    .select("author_name","research_interest","is_artificial_intelligence")\
    .show(5, truncate=False)


+----------------------+------------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|author_name           |email             |affiliation                       |coauthors_names                                                                                                                                      |research_interest                                                                           |
+----------------------+------------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|William Eberle        |tntech.edu

In [None]:
# RDD TO DATA FRAME (MINIMALIST EXAMPLE)
rdd2 = df.rdd.map(lambda x: (x[0]+","+x[1],x[2],x[3]*2))  
df2 = rdd2.toDF(["name","gender","new_salary"])
df2.show()


# Reference:
[View RDD content](https://stackoverflow.com/questions/25295277/view-rdd-contents-in-python-spark)


#### TOP 10 VACCINE WEEKLY 1ST DOES DISTRIBUTION STATES

###### Write as CSV to Google Drive

In [None]:
# print sample
df_in_covid.show(n=2, truncate=False)
# group by average
df_avg_1 = df_in_covid.groupby("jurisdiction")\
  .agg(F.avg("_1st_dose_allocations")
  .alias("avg"))\
  .sort(F.col("avg").desc())\
  .toDF("state", "avg")

print("Top 10 States by 1st dose covid vaccine distribution")
df_avg_1.show(n=10)
print(type(df_avg_1))
# write top 10 by average corona weekly vaccine states 
df_avg_1.limit(10) \
        .coalesce(1) \
        .write \
        .mode("overwrite") \
        .option("header", True) \
        .option("quoteAll",True) \
        .csv(path_out_avg)

+------------+-----------------------+---------------------+---------------------+
|jurisdiction|week_of_allocations    |_1st_dose_allocations|_2nd_dose_allocations|
+------------+-----------------------+---------------------+---------------------+
|Connecticut |2021-06-21T00:00:00.000|54360                |54360                |
|Maine       |2021-06-21T00:00:00.000|21420                |21420                |
+------------+-----------------------+---------------------+---------------------+
only showing top 2 rows

Top 10 States by 1st dose covid vaccine distribution
+----------------+----------+
|           state|       avg|
+----------------+----------+
|      California|  561307.5|
|           Texas| 384333.75|
|         Florida|306883.125|
|Federal Entities|  213150.0|
|            Ohio| 168761.25|
|    Pennsylvania|166415.625|
|        New York| 164036.25|
|  North Carolina| 147026.25|
|         Georgia| 146036.25|
|        Illinois| 146036.25|
+----------------+----------+
only

###### Rename the Spark dataframe written non-human readable CSV file to human-readable one

In [None]:
from os import listdir
import os

def find_csv_filenames( path_to_dir, suffix=".csv" ):
    """ return list of filenames that ends with suffix
    """
    filenames = listdir(path_to_dir)
    return [ filename for filename in filenames if filename.endswith( suffix ) ]

# get file name that just wrote (csv file name)
path_csv_file_path = path_out_avg + "/" + find_csv_filenames(path_out_avg)[0]
# output
path_new_file = path_out_avg + "/" + "top_10_states.csv"
# old file name and new file name
print(f"path_csv_file_path: {path_csv_file_path} \n path_new_file: {path_new_file}")
# rename file
os.rename(path_csv_file_path, path_new_file)

path_csv_file_path: /content/drive/MyDrive/data_processing/results/top_10_corona_dist/top_10_states.csv 
 path_new_file: /content/drive/MyDrive/data_processing/results/top_10_corona_dist/top_10_states.csv


###### PANDAS: Calculate Average For Each State

In [None]:
# PANDAS - Example of reading a csv and writing as a CSV
path_new_file = path_out_avg + "/" + "top_10_states.csv"
print(f"input file -> {path_new_file}")
# read already existing file
df_in_state = pd.read_csv(path_new_file)
# sample
print(df_in_state.head(2))
# calculate avg for each state (the input file already have avg)
# just example to use pandas to do same operation of average
df_in_top10 = df_in_state.groupby("state")["avg"].mean().to_frame("avg").reset_index()
# output 2
path_new_file_pd = path_out_avg + "/" + "top_10_states_pandas.csv"
print(f"output to {path_new_file_pd}")
# write pandas df to csv
df_in_top10.to_csv(path_new_file_pd)

input file -> /content/drive/MyDrive/data_processing/results/top_10_corona_dist/top_10_states.csv
        state        avg
0  California  561307.50
1       Texas  384333.75
output to /content/drive/MyDrive/data_processing/results/top_10_corona_dist/top_10_states_pandas.csv


## SPARK: Average of 1st and 2nd DOSE

In [None]:
# for each state, calculate average # 1st dose, average # 2nd dose

print(list(df_in_covid))

# calculate average weekly 1st dose vaccine distribution
df_avg_1 = df_in_covid.groupby("jurisdiction")\
  .agg(F.avg("_1st_dose_allocations").alias("avg_1"), \
       F.avg("_2nd_dose_allocations").alias("avg_2"), \
       F.sum("_1st_dose_allocations").alias("sum_1"), \
       F.sum("_2nd_dose_allocations").alias("sum_2")
       ) \
  .sort(F.col("avg_1").desc())


df_avg_1.show(15)


[Column<'jurisdiction'>, Column<'week_of_allocations'>, Column<'_1st_dose_allocations'>, Column<'_2nd_dose_allocations'>]
+----------------+----------+----------+---------+---------+
|    jurisdiction|     avg_1|     avg_2|    sum_1|    sum_2|
+----------------+----------+----------+---------+---------+
|      California|  561307.5|  561307.5|8980920.0|8980920.0|
|           Texas| 384333.75| 384333.75|6149340.0|6149340.0|
|         Florida|306883.125|306883.125|4910130.0|4910130.0|
|Federal Entities|  213150.0|  213150.0|3197250.0|3197250.0|
|            Ohio| 168761.25| 168761.25|2700180.0|2700180.0|
|    Pennsylvania|166415.625|166415.625|2662650.0|2662650.0|
|        New York| 164036.25| 164036.25|2624580.0|2624580.0|
|  North Carolina| 147026.25| 147026.25|2352420.0|2352420.0|
|         Georgia| 146036.25| 146036.25|2336580.0|2336580.0|
|        Illinois| 146036.25| 146036.25|2336580.0|2336580.0|
|        Michigan|144826.875|144826.875|2317230.0|2317230.0|
|      New Jersey| 12913

### *Power-lifting* data set => spark aggregate & filter example

In [None]:
print(path_power)
# read power csv
df_power = spark.read.option("header", True).csv(path_power)
print(df_power.show(2))
# groupby Sex
df2 = df_power.groupby("Sex").agg(F.countDistinct("AgeClass")).toDF("gender", "count")
df2.show(n=2)
# Sex "Mx" -> Show list of "AgeClass" available
print("Mx")
df_power.filter("Sex == 'Mx'").select("AgeClass").distinct().show(n=5)
# Sex "M" -> Show list of "AgeClass" available
df_power.filter("Sex == 'M'").select("AgeClass").distinct().show(n=5)
# who did the Best3SquatKg
df_best_3squat = df_power.groupby("Sex").agg(F.max("Best3SquatKg")).toDF("Sex", "Best3SquatKgMax")
df_best_3squat.show(n=5)

/content/drive/MyDrive/dataset_csv/dataset-powerlifting-csv/openpowerlifting-2021-12-10-b420db66.csv
+------------------+---+-----+---------+---+--------+--------------+--------+------------+-------------+--------+--------+--------+--------+------------+--------+--------+--------+--------+------------+-----------+-----------+-----------+-----------+---------------+-------+-----+------+------+------------+--------+------+-------+-----+----------+----------------+----------+-----------+---------+--------+---------------+
|              Name|Sex|Event|Equipment|Age|AgeClass|BirthYearClass|Division|BodyweightKg|WeightClassKg|Squat1Kg|Squat2Kg|Squat3Kg|Squat4Kg|Best3SquatKg|Bench1Kg|Bench2Kg|Bench3Kg|Bench4Kg|Best3BenchKg|Deadlift1Kg|Deadlift2Kg|Deadlift3Kg|Deadlift4Kg|Best3DeadliftKg|TotalKg|Place|  Dots| Wilks|Glossbrenner|Goodlift|Tested|Country|State|Federation|ParentFederation|      Date|MeetCountry|MeetState|MeetTown|       MeetName|
+------------------+---+-----+---------+---+-------

### Spark INNER JOIN EXAMPLE

In [None]:
# create an alias for each of the dataframes to be joined
A = df_power.alias("A")
B = df_best_3squat.alias("B")
# join on sex column
df_join = A.join(B, F.col("A.Sex") == F.col("B.Sex"), 'inner')
lst_interest = ["A.Name", "A.Sex", "A.Best3SquatKg", "B.Best3SquatKgMax"]
df_join.filter("A.Sex == 'F'").select(*lst_interest).show(n=3, truncate=False)
df_join.filter("A.Sex == 'M'").select(*lst_interest).show(n=3, truncate=False)
df_join.filter("A.Sex == 'Mx'").select(*lst_interest).show(n=3, truncate=False)
# id and name occurs in both df
# df_join = A.join(B, ['id', 'name'], 'inner')

### Spark INNER JOIN EXAMPLE

In [None]:
https://www.tutorialspoint.com/pyspark/pyspark_rdd.htm
https://sparkbyexamples.com/pyspark/pyspark-udf-user-defined-function/
https://sparkbyexamples.com/spark/print-the-contents-of-rdd-in-spark-pyspark/
# pipelineds RDD creation when using map operation
https://stackoverflow.com/questions/44355416/need-instance-of-rdd-but-returned-class-pyspark-rdd-pipelinedrdd