In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
%cd /content/drive/MyDrive/PysparkSource

/content/drive/MyDrive/PysparkSource


In [3]:
path_google_scholar = "/content/drive/MyDrive/PysparkSource/output.csv"
path_covid = "/content/drive/MyDrive/PysparkSource/COVID-19_Vaccine_Distribution_Allocations_by_Jurisdiction_-_Pfizer_20240120.csv"
path_covid2 = "/content/drive/MyDrive/PysparkSource/us-counties.csv"
path_out_base_result = "/content/drive/MyDrive/PysparkSource/results"

In [4]:
path_out_avg = path_out_base_result + "top_10_corona+dist"

In [11]:
# Import Important libraries
import pyspark.sql.functions as F
import pandas as pd
from tabulate import tabulate
import traceback
import numpy as np
import matplotlib.pyplot as plt
from requests import get
import os
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import torch
import matplotlib
import pytorch_lightning
from __future__ import print_function
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [12]:
def pretty_print_pandas(title, df, n):
  """ Pretty print """
  print(f"{title}:")
  print(tabulate(df.head(n), headers="keys", tablefmt="psql"))

In [17]:
import findspark
findspark.init()

In [18]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("Colab").config("spark.ui.port", '4050').getOrCreate()

In [20]:
## read google scholar
df_gs = spark.read.option("header", True).csv(path_google_scholar)

## read covid
df_covid = spark.read.option("header", True).csv(path_covid)

## read covid 2
df_covid2 = spark.read.option("header", True).csv(path_covid2)

df_covid2.show(n=2)

+----------+---------+----------+-----+-----+------+
|      date|   county|     state| fips|cases|deaths|
+----------+---------+----------+-----+-----+------+
|2020-01-21|Snohomish|Washington|53061|    1|     0|
|2020-01-22|Snohomish|Washington|53061|    1|     0|
+----------+---------+----------+-----+-----+------+
only showing top 2 rows



In [21]:
c1 = df_covid.select(F.lower("jurisdiction")).distinct().count()
c2 = df_covid2.select(F.lower("state")).distinct().count()
print(f"c1: {c1} c2: {c2}")

c1: 63 c2: 56


In [22]:
# unique states from covid ds1
df_covid.select(F.lower("jurisdiction")).distinct().show(n=10)

# unique states from covid ds 2
df_covid2.select("state").distinct().show(n=10)

+-------------------+
|lower(jurisdiction)|
+-------------------+
|      west virginia|
|      new hampshire|
|    mariana islands|
|            alabama|
|           new york|
|   federal entities|
|     american samoa|
|     north carolina|
|            chicago|
|       pennsylvania|
+-------------------+
only showing top 10 rows

+--------------------+
|               state|
+--------------------+
|                Utah|
|              Hawaii|
|           Minnesota|
|                Ohio|
|Northern Mariana ...|
|              Oregon|
|            Arkansas|
|               Texas|
|        North Dakota|
|        Pennsylvania|
+--------------------+
only showing top 10 rows



## Google scholar --> calculate frequency of research interest

In [24]:
# Calculate frequency for each word in research interest
from pyspark import SparkContext
from operator import add
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SQLContext

In [26]:
## research_interest can't be None
df_gs_clean = df_gs.filter("research_interest != 'None'")
## referring Columns Names
rdd_ri = df_gs_clean.rdd.map(lambda x: (x["research_interest"]))
print("\nSample RDD rows:")
print(rdd_ri.take(5))
print("\nSample RDD rows after frequency count for each words")


Sample RDD rows:
['data_mining##anomaly_detection', 'artificial_intelligence##machine_learning##data_mining##graph_mining##security', 'machine_learning##never_ending_learning##lifelong_machine_learning##medical_informatics', 'graph_mining##big_data_analytics##machine_learning', 'network_security##cyber_physical_systems_security##cyber_education_and_workforce_development']

Sample RDD rows after frequency count for each words


In [27]:
rdd_ri_freq = rdd_ri.flatMap(lambda x: [(w.lower(), 1) for w in x.split('##')]).reduceByKey(add)
## rdd print with take() function
print(rdd_ri_freq.take(5))

# approach 1 : convert to df without any schema (no proper col names)
df_ri_freq = rdd_ri_freq.toDF()

pretty_print_pandas("RI freq without schema", df_ri_freq, 10)

[('data_mining', 63), ('anomaly_detection', 5), ('artificial_intelligence', 123), ('machine_learning', 198), ('graph_mining', 5)]
RI freq without schema:
+---------------------------+-----+
| 0                         |   1 |
|---------------------------+-----|
| data_mining               |  63 |
| anomaly_detection         |   5 |
| artificial_intelligence   | 123 |
| machine_learning          | 198 |
| graph_mining              |   5 |
| security                  |  25 |
| never_ending_learning     |   1 |
| lifelong_machine_learning |   1 |
| medical_informatics       |   7 |
| big_data_analytics        |   5 |
+---------------------------+-----+


In [28]:
# approach 2: convert to df with schema
schema = StructType([StructField("ri", StringType(), False),
                     StructField("frequency", IntegerType(), False)])
# convert rdd to df with schema
df = spark.createDataFrame(rdd_ri_freq, schema)
print("\nProposed Schema of DF:")
df.printSchema()
print("\nRDD converted to DF with schema")
# sort
df_sort = df.sort(F.col("frequency").desc())
df_sort.show(10, truncate=False)


Proposed Schema of DF:
root
 |-- ri: string (nullable = false)
 |-- frequency: integer (nullable = false)


RDD converted to DF with schema
+---------------------------+---------+
|ri                         |frequency|
+---------------------------+---------+
|machine_learning           |198      |
|artificial_intelligence    |123      |
|software_engineering       |94       |
|computer_vision            |81       |
|data_mining                |63       |
|natural_language_processing|63       |
|robotics                   |53       |
|human_computer_interaction |52       |
|information_retrieval      |42       |
|deep_learning              |36       |
+---------------------------+---------+
only showing top 10 rows



In [29]:
# This example takes all the columns in the given google scholar file and process the rdd
# rdd
rdd = spark.sparkContext.textFile(path_google_scholar)
print(type(rdd))
counts = rdd.flatMap(lambda x: [(w.lower(), 1) for w in x.split(',')]).reduceByKey(add)
print(counts.take(5))

<class 'pyspark.rdd.RDD'>
[('author_name', 1), ('email', 1), ('affiliation', 1), ('coauthors_names', 1), ('research_interest', 1)]


## UDF to create a new field "is_artifical_intelligence" of boolean type

In [32]:
lst_ai = ["data_science", "artificial_intelligence", "machine_learning"]

In [33]:
@F.udf
def is_ai(research):
  """ return 1 if research in AI domain else 0 """
  try:
    # split the research interest string with delimiter "##"
    lst_research = [w.lower() for w in str(research).split("##")]

    for res in lst_research:
      # if present in AI domain
      if res in lst_ai:
        return 1
      # not present in AI domain
      return 0
  except:
    return -1

# df read
df_gs = spark.read.option("header", True).csv(path_google_scholar)
# create a new column "is_artificial_intelligence"
df_gs_new = df_gs.withColumn("is_artificial_intelligence", is_ai(F.col("research_interest")))

# df_gs_new.printSchema()
df_gs.show(5, truncate=False)

+----------------------+------------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|author_name           |email             |affiliation                       |coauthors_names                                                                                                                                      |research_interest                                                                           |
+----------------------+------------------+----------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+
|William Eberle        |tntech.edu

In [34]:
df_gs_new.show(n=20)
print(f"Verify that is_ai should have only 2 distinct value: 0 & 1")

+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------------+
|         author_name|              email|         affiliation|     coauthors_names|   research_interest|is_artificial_intelligence|
+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------------+
|      William Eberle|         tntech.edu|Tennessee Technol...|                NULL|data_mining##anom...|                         0|
|     Lawrence Holder|            wsu.edu|Washington State ...|Diane J Cook##Wil...|artificial_intell...|                         1|
|          Talbert DA|         tntech.edu|Tennessee Technol...|                NULL|machine_learning#...|                         1|
|Dr. Sirisha Velam...| crraoaimscs.res.in|                NULL|William Eberle##L...|graph_mining##big...|                         0|
|      Ambareen Siraj|         tntech.edu|Tennessee Tech Un...|Rayfor

In [36]:
df_gs_new.select("is_artificial_intelligence").distinct().show(5)

+--------------------------+
|is_artificial_intelligence|
+--------------------------+
|                         0|
|                         1|
+--------------------------+



In [38]:
df_gs_new[df_gs_new["author_name"].isin(["Christa Cody", "Gabriel Weimann", ""])].select("author_name","research_interest","is_artificial_intelligence") .show(5, truncate=False)

+---------------+-----------------------------------------------------------------+--------------------------+
|author_name    |research_interest                                                |is_artificial_intelligence|
+---------------+-----------------------------------------------------------------+--------------------------+
|Christa Cody   |artificial_intelligence##machine_learning##educational_technology|1                         |
|Gabriel Weimann|political_communication##terrorism##media_effects                |0                         |
+---------------+-----------------------------------------------------------------+--------------------------+



### Top 10 Vaccine weekly 1st does distribution states

In [45]:
# print sample
df_covid.show(n=2, truncate=False)
# group by average
df_avg_1 = df_covid.groupby("jurisdiction")\
  .agg(F.avg("1st Dose Allocations")
  .alias("avg"))\
  .sort(F.col("avg").desc())\
  .toDF("state", "avg")

+------------+-------------------+--------------------+--------------------+
|Jurisdiction|Week of Allocations|1st Dose Allocations|2nd Dose Allocations|
+------------+-------------------+--------------------+--------------------+
|Connecticut |06/21/2021         |54360               |54360               |
|Maine       |06/21/2021         |21420               |21420               |
+------------+-------------------+--------------------+--------------------+
only showing top 2 rows



In [46]:
print("Top 10 states by 1st dose covid vaccine distribution")
df_avg_1.show(n=10)
print(type(df_avg_1))

Top 10 states by 1st dose covid vaccine distribution
+----------------+------------------+
|           state|               avg|
+----------------+------------------+
|      California|440477.14285714284|
|           Texas|301665.53571428574|
|         Florida|240923.57142857142|
|Federal Entities|176817.32142857142|
|            Ohio|         132558.75|
|    Pennsylvania|         130563.75|
|        New York|128911.60714285714|
|  North Carolina|115493.57142857143|
|        Illinois|          114705.0|
|         Georgia|114635.35714285714|
+----------------+------------------+
only showing top 10 rows

<class 'pyspark.sql.dataframe.DataFrame'>


In [47]:
# write top 10 by average corona weekly vaccine states
df_avg_1.limit(10).coalesce(1).write.mode("overwrite").option("header", True).option("quoteAll", True).csv(path_out_avg)

In [48]:
from os import listdir
import os

In [51]:
def find_csv_filenames(path_to_dir, suffix=".csv"):
  """ return list of filenames that ends with suffix """
  filenames = listdir(path_to_dir)
  return [filename for filename in filenames if filename.endswith(suffix)]

# get filename that just wrote (csv file name)
path_csv_file_path = path_out_avg + "/" + find_csv_filenames(path_out_avg)[0]
path_new_file = path_out_avg + "/" + "top_10_states.csv"

# old file name and new file name
print(f"path_csv_file_path: {path_csv_file_path} \n path_new_file: {path_new_file}")
# rename file
os.rename(path_csv_file_path, path_new_file)

path_csv_file_path: /content/drive/MyDrive/PysparkSource/resultstop_10_corona+dist/part-00000-6757c8b1-d34f-4ed0-bcbb-3dfc0a6f8148-c000.csv 
 path_new_file: /content/drive/MyDrive/PysparkSource/resultstop_10_corona+dist/top_10_states.csv


In [52]:
df_in_state = pd.read_csv(path_new_file)
# sample
df_in_state.head(5)

Unnamed: 0,state,avg
0,California,440477.142857
1,Texas,301665.535714
2,Florida,240923.571429
3,Federal Entities,176817.321429
4,Ohio,132558.75


In [53]:
# calculate avg for each state (the input file already have avg)
# just example tp use pandas to do same operation of average
df_in_top10 = df_in_state.groupby("state")["avg"].mean().to_frame("avg").reset_index()
df_in_top10.head(5)

Unnamed: 0,state,avg
0,California,440477.142857
1,Federal Entities,176817.321429
2,Florida,240923.571429
3,Georgia,114635.357143
4,Illinois,114705.0


In [54]:
path_new_file_pd = path_out_avg + "/" + "top_10_states_pandas.csv"
print(f"output to {path_new_file_pd}")
# write pandas df to csv
df_in_top10.to_csv(path_new_file_pd)

output to /content/drive/MyDrive/PysparkSource/resultstop_10_corona+dist/top_10_states_pandas.csv


## Spark: Average of 1st and 2nd DOSE

In [55]:
# for each state, calculate average # 1st dose, average 2nd dose
print(list(df_covid))

[Column<'Jurisdiction'>, Column<'Week of Allocations'>, Column<'1st Dose Allocations'>, Column<'2nd Dose Allocations'>]


In [57]:
# calculate average weekly 1st dose vaccine distribution
df_avg = df_covid.groupby(F.lower("jurisdiction").alias("state"))\
          .agg(F.avg("1st Dose Allocations").alias("avg_1"), \
               F.avg("2nd Dose Allocations").alias("avg_2"), \
               F.sum("1st Dose Allocations").alias("sum_1"), \
               F.avg("2nd Dose Allocations").alias("sum_2"), \
               ).sort(F.col("avg_1").desc())

In [58]:
df_avg.show(15)

+----------------+------------------+------------------+----------+------------------+
|           state|             avg_1|             avg_2|     sum_1|             sum_2|
+----------------+------------------+------------------+----------+------------------+
|      california|440477.14285714284|440477.14285714284|1.233336E7|440477.14285714284|
|           texas|301665.53571428574|301665.53571428574| 8446635.0|301665.53571428574|
|         florida|240923.57142857142|240923.57142857142| 6745860.0|240923.57142857142|
|federal entities|176817.32142857142|174212.67857142858| 4950885.0|174212.67857142858|
|            ohio|         132558.75|         132558.75| 3711645.0|         132558.75|
|    pennsylvania|         130563.75|         130563.75| 3655785.0|         130563.75|
|        new york|128911.60714285714|128911.60714285714| 3609525.0|128911.60714285714|
|  north carolina|115493.57142857143|115493.57142857143| 3233820.0|115493.57142857143|
|        illinois|          114705.0|      

## Spark: equi join first covid dataset and second covid dataset

In [59]:
print(df_covid2.show(2))
# groupby Sex
df_cases = df_covid2.groupby(F.lower("state").alias("state")).agg(F.sum("deaths").alias("sum_deaths"), F.sum("cases").alias("sum_cases"))

+----------+---------+----------+-----+-----+------+
|      date|   county|     state| fips|cases|deaths|
+----------+---------+----------+-----+-----+------+
|2020-01-21|Snohomish|Washington|53061|    1|     0|
|2020-01-22|Snohomish|Washington|53061|    1|     0|
+----------+---------+----------+-----+-----+------+
only showing top 2 rows

None


In [61]:
df_cases.show(n=3)

+-------------+----------+------------+
|        state|sum_deaths|   sum_cases|
+-------------+----------+------------+
|west virginia| 2077693.0|1.35395313E8|
|new hampshire|  913568.0| 7.9409942E7|
|      alabama| 7294870.0|4.23771271E8|
+-------------+----------+------------+
only showing top 3 rows



## Spark inner join example & left join example

In [62]:
# get total distinct states from covid (modrena) dataset and covid (ny) dataset
c1 = df_avg.select("state").distinct().count()
c2 = df_cases.select("state").distinct().count()
print(f"c1: {c1} c2: {c2}")

c1: 63 c2: 56


In [63]:
# create an alias for each of the Dataframe to be joined
df_m = df_avg.alias("df_m")
df_ny = df_cases.alias("df_ny")

In [65]:
print("EQUI JOIN")
# EQUI JOIn / INNER JOIN => only matched on both side (DATAFRAME) on column "state"
df_inner = df_m.join(df_ny, F.col("df_m.state") == F.col("df_ny.state"), 'inner')

EQUI JOIN


In [67]:
lst_interest = ["df_m.state", "df_ny.state", "df_m.avg_1", "df_m.avg_2", "df_ny.sum_deaths", "df_ny.sum_cases"]
# print all states
df_inner.select(*lst_interest).show(n=5, truncate=False)

+--------------+--------------+------------------+------------------+-----------+-------------+
|state         |state         |avg_1             |avg_2             |sum_deaths |sum_cases    |
+--------------+--------------+------------------+------------------+-----------+-------------+
|west virginia |west virginia |21733.928571428572|21733.928571428572|2077693.0  |1.35395313E8 |
|new hampshire |new hampshire |16264.285714285714|16264.285714285714|913568.0   |7.9409942E7  |
|alabama       |alabama       |55455.0           |55455.0           |7294870.0  |4.23771271E8 |
|new york      |new york      |128911.60714285714|128911.60714285714|3.5976831E7|1.546095621E9|
|american samoa|american samoa|1518.2142857142858|0.0               |624.0      |268683.0     |
+--------------+--------------+------------------+------------------+-----------+-------------+
only showing top 5 rows



In [68]:
# total distinct state count
c_inner = df_inner.select("df_m.state").distinct().count()
print(f"Total # states after inner join: {c_inner}")

Total # states after inner join: 54


In [69]:
print("LEFT JOIN")
# LEFT JOIN => all rows on the left side table, right side table not having matched values will be "null"
df_left = df_m.join(df_ny, F.col("df_m.state") == F.col("df_ny.state"), 'left')
df_left.show(n=5)

# total distinct states count
c_left = df_left.select("df_m.state").distinct().count()
print(f"Total # states after inner join: {c_left}")

LEFT JOIN
+---------------+------------------+------------------+---------+------------------+-------------+-----------+-------------+
|          state|             avg_1|             avg_2|    sum_1|             sum_2|        state| sum_deaths|    sum_cases|
+---------------+------------------+------------------+---------+------------------+-------------+-----------+-------------+
|  west virginia|21733.928571428572|21733.928571428572| 608550.0|21733.928571428572|west virginia|  2077693.0| 1.35395313E8|
|  new hampshire|16264.285714285714|16264.285714285714| 455400.0|16264.285714285714|new hampshire|   913568.0|  7.9409942E7|
|mariana islands|1518.2142857142858|               0.0|  42510.0|               0.0|         NULL|       NULL|         NULL|
|        alabama|           55455.0|           55455.0|1552740.0|           55455.0|      alabama|  7294870.0| 4.23771271E8|
|       new york|128911.60714285714|128911.60714285714|3609525.0|128911.60714285714|     new york|3.5976831E7|1.546