## Using Spark

In [None]:
import pandas as pd

In [5]:
import os
import pyspark

In [6]:
from pyspark.sql.functions import mean, stddev, count

In [7]:
from pyspark.sql.functions import split, explode, col, lower

In [8]:
from pyspark.sql.functions import collect_list, concat_ws

In [9]:
from pyspark.sql import SparkSession

In [12]:
#path to environment
os.environ['PYSPARK_PYTHON'] = '/Users/zliao/opt/anaconda3/envs/metis/bin/python'

In [13]:
spark = pyspark.sql.SparkSession.builder.config('spark.driver.memory', '6g').getOrCreate()

### Summary data reporting:

In [None]:
df1 = spark.read.csv('./stats.csv',sep=",", inferSchema="true", header="true")

In [None]:
df1.show(10)

https://sparkbyexamples.com/pyspark/pyspark-groupby-explained-with-example/

In [None]:
stats_vis = df1.groupby(['vax_manu', 'vax_dose_series', 'v_adminby'])\
    .agg(mean('numdays').alias('avg_onset'),\
        stddev('numdays').alias('std_onset'),\
        mean('age_yrs').alias('avg_age'),\
        stddev('age_yrs').alias('std_age'),\
        count('vaers_id').alias('cases'))

In [None]:
stats_vis.show(5)

In [None]:
#examining number of rows
stats_vis.count() 

In [None]:
#save! back to pandas since smaller now

stats_vis_df = stats_vis.toPandas()

In [None]:
stats_vis_df.to_csv('stats_vis.csv')

### Adverse effects reporting:

In [14]:
ades_df = spark.read.csv('./ade.csv',sep=",", inferSchema="true", header="true")

#enforce schema

                                                                                

In [None]:
ades_df.show(10)

https://stackoverflow.com/questions/41788919/concatenating-string-by-rows-in-pyspark

In [15]:
ades_df2 = ades_df.select(['vax_manu', 'vax_dose_series', 'all_symptoms']).groupby(['vax_manu', 'vax_dose_series']).agg(concat_ws(',', collect_list(ades_df.all_symptoms)).alias('symptoms'))

In [16]:
ades_df2.show(5)

                                                                                

+--------------------+---------------+--------------------+
|            vax_manu|vax_dose_series|            symptoms|
+--------------------+---------------+--------------------+
|             MODERNA|              3|Arthralgia, Myalg...|
|             JANSSEN|              4|No adverse event,...|
|     PFIZER\BIONTECH|             5+|Hypoaesthesia, Pa...|
|             JANSSEN|            UNK|Unevaluable event...|
|UNKNOWN MANUFACTURER|              3|Death, , , , ,Pai...|
+--------------------+---------------+--------------------+
only showing top 5 rows



In [17]:
from pyspark.sql.functions import udf
import re

https://stackoverflow.com/questions/46897988/removing-comma-in-a-column-in-pyspark

In [18]:
comma_clean = udf(lambda x: re.sub(" ,","", x))
#clean more in sql

In [19]:
ades_df2 = ades_df2.withColumn('cleaned_symptoms', comma_clean('symptoms'))

In [20]:
ades_df2.show(5)

                                                                                

+--------------------+---------------+--------------------+--------------------+
|            vax_manu|vax_dose_series|            symptoms|    cleaned_symptoms|
+--------------------+---------------+--------------------+--------------------+
|             MODERNA|              3|Arthralgia, Myalg...|Arthralgia, Myalg...|
|             JANSSEN|              4|No adverse event,...|No adverse event,...|
|     PFIZER\BIONTECH|             5+|Hypoaesthesia, Pa...|Hypoaesthesia, Pa...|
|             JANSSEN|            UNK|Unevaluable event...|Unevaluable event...|
|UNKNOWN MANUFACTURER|              3|Death, , , , ,Pai...|Death,Pain in ext...|
+--------------------+---------------+--------------------+--------------------+
only showing top 5 rows





In [24]:
#getting counts for each symptom (within manufacturer and dose series group)
(ades_df2.withColumn('separate', explode(split(lower(col('cleaned_symptoms')), ','))).groupby('separate').count().show(10))



+--------------------+-----+
|            separate|count|
+--------------------+-----+
|angiogram cerebra...|  162|
| arteriogram carotid|  142|
|        heat therapy|   18|
|biopsy skin abnormal|  147|
|         face oedema|   55|
|  swelling of eyelid|   98|
|       eye discharge|  175|
|blood cholesterol...|   31|
| mycobacterium tu...|   49|
|      electromyogram|  530|
+--------------------+-----+
only showing top 10 rows



                                                                                

In [28]:
ades_df3 = ades_df2.withColumn('cleaned_symptoms', explode(split(lower(col('cleaned_symptoms')), ','))).groupby(['vax_manu', 'vax_dose_series','cleaned_symptoms']).agg(count('cleaned_symptoms').alias('symptom_count'))

In [22]:
ades_df3.show()



+---------------+---------------+--------------------+-------------+
|       vax_manu|vax_dose_series|    cleaned_symptoms|symptom_count|
+---------------+---------------+--------------------+-------------+
|PFIZER\BIONTECH|             5+|              tremor|            2|
|PFIZER\BIONTECH|             5+|             aphasia|            2|
|PFIZER\BIONTECH|             5+|      vision blurred|            2|
|PFIZER\BIONTECH|             5+|  cardiac monitoring|            1|
|PFIZER\BIONTECH|             5+|           blindness|            1|
|PFIZER\BIONTECH|              1|           dysstasia|           55|
|PFIZER\BIONTECH|              1|   burning sensation|          904|
|PFIZER\BIONTECH|              1|      prostatomegaly|            8|
|PFIZER\BIONTECH|              1|        hidradenitis|           12|
|PFIZER\BIONTECH|              1|  c-reactive protein|          251|
|PFIZER\BIONTECH|              1|           mydriasis|          146|
|PFIZER\BIONTECH|              1| 

                                                                                

https://stackoverflow.com/questions/38397796/retrieve-top-n-in-each-group-of-a-dataframe-in-pyspark

In [30]:
from pyspark.sql.window import Window

In [31]:
from pyspark.sql.functions import col, row_number

In [41]:
column_list = ["vax_manu","vax_dose_series"]
windowDept = Window.partitionBy([col(x) for x in column_list]).orderBy(col('symptom_count').desc())

In [42]:
ades_df4 = ades_df3.withColumn("row", row_number().over(windowDept)).filter(col("row") <= 10)

In [43]:
ades_df4.show()



+--------+---------------+--------------------+-------------+---+
|vax_manu|vax_dose_series|    cleaned_symptoms|symptom_count|row|
+--------+---------------+--------------------+-------------+---+
| MODERNA|              3|expired product a...|         3622|  1|
| MODERNA|              3|             pyrexia|         3118|  2|
| MODERNA|              3|                pain|         2612|  3|
| MODERNA|              3|            headache|         2592|  4|
| MODERNA|              3|             fatigue|         2189|  5|
| MODERNA|              3|           urticaria|         1918|  6|
| MODERNA|              3|   pain in extremity|         1700|  7|
| MODERNA|              3|              nausea|         1602|  8|
| MODERNA|              3|            pruritus|         1559|  9|
| MODERNA|              3|              chills|         1324| 10|
| JANSSEN|              4| interchange of v...|            5|  1|
| JANSSEN|              4|           dizziness|            4|  2|
| JANSSEN|

                                                                                

In [44]:
#checking number of rows 
ades_df4.count()

                                                                                

230

In [45]:
ades_df4.filter('row = 1').show()

                                                                                

+--------------------+---------------+--------------------+-------------+---+
|            vax_manu|vax_dose_series|    cleaned_symptoms|symptom_count|row|
+--------------------+---------------+--------------------+-------------+---+
|             MODERNA|              3|expired product a...|         3622|  1|
|             JANSSEN|              4| interchange of v...|            5|  1|
|     PFIZER\BIONTECH|             5+|extra dose admini...|           17|  1|
|             JANSSEN|            UNK|             pyrexia|         5301|  1|
|UNKNOWN MANUFACTURER|              3|            dyspnoea|            7|  1|
|UNKNOWN MANUFACTURER|              1|            headache|          139|  1|
|             MODERNA|              2|             pyrexia|        20446|  1|
|             JANSSEN|              1|            headache|         8433|  1|
|UNKNOWN MANUFACTURER|             5+| impaired work ab...|            1|  1|
|             JANSSEN|             5+|            headache|     

In [46]:
#save! back to pandas since smaller now

top_ades_df = ades_df4.toPandas()


                                                                                

In [47]:
top_ades_df.to_csv('top_ades.csv')

Supplemental:

In [None]:
top_ade_df = spark.sql('''
SELECT vax_manu, vax_dose_series, cleaned_symptoms
FROM
(SELECT *, ROW_NUMBER() OVER (PARTITION BY cleaned_symptoms ORDER BY symptom_count DESC) AS n 
FROM ade_counts) AS x
WHERE n <= 10;
''')