## T1.

Write an AWS Command Line call to aws emr create-cluster, that:
Launches a cluster of 3 m5.xlarge machines (one name, two worker nodes). Installs the applications of Spark, Ganglia, and JupyterHub; Adds an additional name node security group opening pertinent ports (JupyterHub - which is 9443, SSH, Spark History Server (Links to an external site.)Links to an external site.); Uses a key-pair for secure access; Runs a bootstrap script that you have stored in AWS S3 (code for that bootstrap script is below).

### AWS CLI code for initializing cluster:

aws emr create-cluster --auto-scaling-role EMR_AutoScaling_DefaultRole --applications Name=Hive Name=JupyterHub Name=Ganglia Name=Spark --ebs-root-volume-size 10 --ec2-attributes '{"KeyName":"aws-key1","InstanceProfile":"EMR_EC2_DefaultRole","SubnetId":"subnet-05f8732b","EmrManagedSlaveSecurityGroup":"sg-0314eaf4b5c5d861d","EmrManagedMasterSecurityGroup":"sg-0314eaf4b5c5d861d"}' --service-role EMR_DefaultRole --enable-debugging --release-label emr-5.19.0 --log-uri 's3n://aws-logs-253161286339-us-east-1/elasticmapreduce/' --name 'lsdm_cluster_ps3_vf' --instance-groups '[{"InstanceCount":1,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"MASTER","InstanceType":"m5.xlarge","Name":"Master - 1"},{"InstanceCount":2,"EbsConfiguration":{"EbsBlockDeviceConfigs":[{"VolumeSpecification":{"SizeInGB":32,"VolumeType":"gp2"},"VolumesPerInstance":1}]},"InstanceGroupType":"CORE","InstanceType":"m5.xlarge","Name":"Core - 2"}]' --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region us-east-1 --bootstrap-actions Path=s3://lsdmghosevf/lsdm_ps3_bash.sh

## T2

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import lower, col

sc.setJobGroup("001", "PS3 Job1")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1541988201363_0002,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


## Use the SparkSession.builder to set spark.executor.instances to 4, spark.executor.memory to '6G', spark.executor.cores and 2

In [2]:
spark.conf.set("spark.executor.instances", 4)

In [3]:
spark.conf.set("spark.executor.memory", '6G')

In [4]:
spark.conf.set("spark.executor.cores", 2)

## Load in all three csv files into Spark

In [5]:
d1 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://lsdm-emr-util/lsdm-data/dime_recipients_all_1979_2014.csv')

In [6]:
d2 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://lsdm-emr-util/lsdm-data/vote_db.csv')

In [7]:
d3 = spark.read.option('header', 'true').option('inferSchema', 'true').csv('s3://lsdm-emr-util/lsdm-data/text_db.csv.gz')

## Alter column names for all dataframes to replace `.` with `_`

In [8]:
sc.setJobGroup("002", "PS3 Job2")
def alter_col_names(df):
    newColList = list((col.replace('.', '_') for col in df.columns))
    for oldCol, newCol in zip(df.columns, newColList):
        df = df.withColumnRenamed(oldCol, newCol)
    return df

In [9]:
d1 = alter_col_names(d1)

In [10]:
d2 = alter_col_names(d2)

In [11]:
d3 = alter_col_names(d3)

In [12]:
d1

DataFrame[election: string, cycle: string, fecyear: string, Cand_ID: string, FEC_ID: string, NID: string, ICPSR: string, ICPSR2: string, bonica_rid: string, bonica_cid: bigint, name: string, lname: string, ffname: string, fname: string, mname: string, nname: string, title: string, suffix: string, party: string, state: string, seat: string, district: string, Incum_Chall: string, recipient_cfscore: double, contributor_cfscore: double, recipient_cfscore_dyn: double, dwnom1: double, dwnom2: double, ps_dwnom1: double, ps_dwnom2: double, dwdime: double, irt_cfscore: double, num_givers: int, num_givers_total: int, n_data_points_personal_donations: int, n_data_points_personal_donations_unq: int, cand_gender: string, total_disbursements: double, total_pc_contribs: string, contribs_from_candidate: double, unitemized: double, non_party_ind_exp_for: decimal(10,0), non_party_ind_exp_against: string, ind_exp_for: decimal(10,0), ind_exp_against: int, comm_cost_for: int, comm_cost_against: int, party_

In [13]:
d2

DataFrame[vote_id: string, bill_id: string, bonica_rid: string, Name: string, party: int, sponsor: int, cosponsor: int, vote_date: string, vote_choice: int, vs_idealPoint: double, vs_cuttingpoint: double, vs_rcdir: int]

In [14]:
d3

DataFrame[doc_id: string, doc_type: string, bonica_rid: string, bill_id: string, sponsor_rid: string, page_id: string, congno: int, legis_body: string, training_set: int, date: timestamp, text: string, stemmed_text: string, doc_labels: string, tw_latent1: double, tw_abortion_and_social_conservatism: double, tw_agriculture: double, tw_banking_and_finance: double, tw_civil_rights: double, tw_congress_and_procedural: double, tw_crime: double, tw_defense_and_foreign_policy: double, tw_economy: double, tw_education: double, tw_energy: double, tw_environment: double, tw_fair_elections: double, tw_federal_agencies_and_gov_regulation: double, tw_guns: double, tw_healthcare: double, tw_higher_education: double, tw_immigration: double, tw_indian_affairs: double, tw_intelligence_and_surveillance: double, tw_labor: double, tw_law_courts_and_judges: double, tw_transportation: double, tw_veterans_affairs: double, tw_womens_issues: double]

In [17]:
d1 = d1.withColumnRenamed("bonica_rid", "bonica_rid_d1")

In [18]:
d2 = d2.withColumnRenamed("bonica_rid", "bonica_rid_d2")

In [19]:
d3 = d3.withColumnRenamed("bonica_rid", "bonica_rid_d3")

## Merge three (of the four possible) dataframes together

In [20]:
sc.setJobGroup("003", "PS3 Job3")
merged_df = d1.join(d2, d1.bonica_rid_d1 == d2.bonica_rid_d2,how='left')

In [21]:
merged_df = merged_df.join(d3, merged_df.bonica_rid_d2 == d3.bonica_rid_d3,how='left')

In [22]:
merged_df

DataFrame[election: string, cycle: string, fecyear: string, Cand_ID: string, FEC_ID: string, NID: string, ICPSR: string, ICPSR2: string, bonica_rid_d1: string, bonica_cid: bigint, name: string, lname: string, ffname: string, fname: string, mname: string, nname: string, title: string, suffix: string, party: string, state: string, seat: string, district: string, Incum_Chall: string, recipient_cfscore: double, contributor_cfscore: double, recipient_cfscore_dyn: double, dwnom1: double, dwnom2: double, ps_dwnom1: double, ps_dwnom2: double, dwdime: double, irt_cfscore: double, num_givers: int, num_givers_total: int, n_data_points_personal_donations: int, n_data_points_personal_donations_unq: int, cand_gender: string, total_disbursements: double, total_pc_contribs: string, contribs_from_candidate: double, unitemized: double, non_party_ind_exp_for: decimal(10,0), non_party_ind_exp_against: string, ind_exp_for: decimal(10,0), ind_exp_against: int, comm_cost_for: int, comm_cost_against: int, par

## Write the same grouped aggregation in PySpark and then again in Spark SQL. Run .explain() on the resulting aggregations - what does this tell you about the plans for the two dataframes?

In [23]:
sc.setJobGroup("004", "PS3 Jo4b")
from pyspark.sql.functions import count, avg
g_df = merged_df.groupBy("election", "cycle").agg(avg("total_disbursements"), count("*"))

In [24]:
g_df.show(10)

+--------------------+-----+------------------------+---------+
|            election|cycle|avg(total_disbursements)| count(1)|
+--------------------+-----+------------------------+---------+
|                1996| 1996|                    null| 47665388|
|                2018| 2018|                    null|   226107|
|                2012| 2012|                    null|  8771662|
|                  9"|    C|                    null|        1|
|            fdfd2016| 2016|       927530.2371189878|469952690|
|            fdfd2014| 2014|      1745129.8572086587|513770614|
|",,128224,0,0,0,0...|    1|                    null|        1|
|            fdfd2006| 2006|       1412034.249539594|512123250|
|                1990| 1990|                    null|  7688242|
|                2004| 2004|                    null| 29082686|
+--------------------+-----+------------------------+---------+
only showing top 10 rows

In [25]:
g_df.explain()

== Physical Plan ==
*(10) HashAggregate(keys=[election#10, cycle#11], functions=[avg(total_disbursements#3053), count(1)])
+- Exchange hashpartitioning(election#10, cycle#11, 200)
   +- *(9) HashAggregate(keys=[election#10, cycle#11], functions=[partial_avg(total_disbursements#3053), partial_count(1)])
      +- *(9) Project [election#10, cycle#11, total_disbursements#3053]
         +- SortMergeJoin [bonica_rid_d2#7629], [bonica_rid_d3#7642], LeftOuter
            :- *(6) Sort [bonica_rid_d2#7629 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(bonica_rid_d2#7629, 200)
            :     +- *(5) Project [election#10, cycle#11, total_disbursements#3053, bonica_rid_d2#7629]
            :        +- SortMergeJoin [bonica_rid_d1#7554], [bonica_rid_d2#7629], LeftOuter
            :           :- *(2) Sort [bonica_rid_d1#7554 ASC NULLS FIRST], false, 0
            :           :  +- Exchange hashpartitioning(bonica_rid_d1#7554, 200)
            :           :     +- *(1) Proj

In [26]:
merged_df.createOrReplaceTempView("merged_df")
spark_sql_df = spark.sql("SELECT election, cycle, avg(total_disbursements), count(*) from merged_df group by election, cycle")

In [27]:
spark_sql_df.show(10)

+--------------------+-----+------------------------+---------+
|            election|cycle|avg(total_disbursements)| count(1)|
+--------------------+-----+------------------------+---------+
|                2018| 2018|                    null|   226107|
|                1996| 1996|                    null| 47665388|
|                2012| 2012|                    null|  8771662|
|                  9"|    C|                    null|        1|
|            fdfd2016| 2016|       927530.2371189882|469952690|
|            fdfd2014| 2014|       1745129.857208658|513770614|
|",,128224,0,0,0,0...|    1|                    null|        1|
|            fdfd2006| 2006|       1412034.249539594|512123250|
|                1990| 1990|                    null|  7688242|
|                2004| 2004|                    null| 29082686|
+--------------------+-----+------------------------+---------+
only showing top 10 rows

In [28]:
spark_sql_df.explain()

== Physical Plan ==
*(10) HashAggregate(keys=[election#10, cycle#11], functions=[avg(total_disbursements#3053), count(1)])
+- Exchange hashpartitioning(election#10, cycle#11, 200)
   +- *(9) HashAggregate(keys=[election#10, cycle#11], functions=[partial_avg(total_disbursements#3053), partial_count(1)])
      +- *(9) Project [election#10, cycle#11, total_disbursements#3053]
         +- SortMergeJoin [bonica_rid_d2#7629], [bonica_rid_d3#7642], LeftOuter
            :- *(6) Sort [bonica_rid_d2#7629 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(bonica_rid_d2#7629, 200)
            :     +- *(5) Project [election#10, cycle#11, total_disbursements#3053, bonica_rid_d2#7629]
            :        +- SortMergeJoin [bonica_rid_d1#7554], [bonica_rid_d2#7629], LeftOuter
            :           :- *(2) Sort [bonica_rid_d1#7554 ASC NULLS FIRST], false, 0
            :           :  +- Exchange hashpartitioning(bonica_rid_d1#7554, 200)
            :           :     +- *(1) Proj

### What does this tell you about the plans for the two dataframes?

After running the two separate grouped aggregations and running .explain() on them, we get to see the physical plan of the grouped aggregation queries.  The physical plan is essentially the ordered set of steps that the relational system uses to execute its query and access data. 

## Run a windowing aggregation

In [29]:
sc.setJobGroup("005", "PS5 Job")
from pyspark.sql.window import Window

In [30]:
w = Window.partitionBy("election").orderBy("contributor_cfscore").rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [31]:
from pyspark.sql.functions import rank, min

In [32]:
win_df = merged_df.select(rank().over(w), min('total_disbursements').over(w))

In [33]:
win_df.show(1)

+---------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
|RANK() OVER (PARTITION BY election ORDER BY contributor_cfscore ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|min(total_disbursements) OVER (PARTITION BY election ORDER BY contributor_cfscore ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)|
+---------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                1|                             

## Run a User-Defined-Function (UDF), using Python's nltk package to analyze the text data in the Congressional Text dataframe.

In [34]:
sc.setJobGroup("006", "PS3 Job6")
from pyspark.sql.functions import udf

In [35]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /var/lib/livy/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
True

In [36]:
def nltk_count_text(col, phrase):   # Using this text analysis function to get the distribution of the counts of unique phrases in a dataframe column
    txt = str(col)
    txt = nltk.word_tokenize(txt)
    txt = nltk.Text(txt)
    print(txt.count(phrase))

In [41]:
from pyspark.sql.functions import udf
udf_nltk_count_text = udf(nltk_count_text)

In [40]:
d3.select(udf_nltk_count_text(col("legis_body"))).show()

In [37]:
# def list_freq_legis(df):
#     leg_list = df.select('legis_body').collect()
#     print(FreqDist(leg_list))

In [42]:
# from pyspark.sql.functions import col
# d3.select(udf_nltk_count_text(col("legis_body"))).show()

In [38]:
# from pyspark.sql.functions import udf
# udf_list_freq_legis = udf(list_freq_legis)

In [39]:
# d3.select(udf_list_freq_legis()).show()

## Collect a sample or aggregation of the data to the name node as a Pandas dataframe and print some of the rows of the collected dataframe.

In [43]:
sc.setJobGroup("007", "PS3 Job7")
sample_df = merged_df.limit(2)

In [44]:
plot_sample = sample_df.collect()

In [45]:
print(plot_sample)

[Row(election='2010', cycle='2010', fecyear='2010', Cand_ID='WV130349', FEC_ID=None, NID=None, ICPSR='WV1303492010', ICPSR2='WV130349', bonica_rid_d1='cand118439', bonica_cid=3822466047, name='tucker, gregory a', lname='tucker', ffname='gregory a', fname='gregory', mname='a', nname=None, title=None, suffix=None, party='100', state='WV', seat='state:upper', district='WV-11', Incum_Chall='C', recipient_cfscore=-0.022, contributor_cfscore=-0.187, recipient_cfscore_dyn=-0.07, dwnom1=None, dwnom2=None, ps_dwnom1=None, ps_dwnom2=None, dwdime=-0.266, irt_cfscore=None, num_givers=100, num_givers_total=143, n_data_points_personal_donations=12, n_data_points_personal_donations_unq=12, cand_gender='M', total_disbursements=None, total_pc_contribs='0', contribs_from_candidate=26200.0, unitemized=0.0, non_party_ind_exp_for=Decimal('0'), non_party_ind_exp_against='0', ind_exp_for=Decimal('0'), ind_exp_against=0, comm_cost_for=0, comm_cost_against=0, party_coord_exp=Decimal('0'), party_ind_exp_against

## Task 3. Conceptual Analysis - Identify and describe the following:

Referenced: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/, https://stackoverflow.com/questions/34580662/what-does-stage-skipped-mean-in-apache-spark-web-ui, https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-whole-stage-codegen.html

### One shuffle between two stages;

In this case, the one shuffle (a process wherein there is a transfer of data between stages or sets of parallel tasks) -- data is moved and redistributed from one stage to the other.  

### One narrow transformation

One narrow transformation is essentially a function like map/filter that takes an RDD as its input and returns at least one RDD as its ouput -- all on a single partition.  


### One wide transformation

One wide transformation is a transformation that is essentailly the result of applying groupbyKey or reducebyKey.  In wide transformations, the data that is used to do computations over lives across multiple partitions -- unlike with a narrow transformation.  

### For one Spark job, use the DAG visualization in the Spark History app to examine the tasks and stages, then explain in detail (include a screenshot of the DAG)

In the screenshot attached to the assignment submission, we can see that there were four stages that were skipped (stages 77, 78, 79, and 80) while Stage 81 was completed.  The stages were likely skipped because the data was retrieved from the cache memory and it was not necessary to execute it again in the stage.  In the completed Stage 81, there are two exchanges where an entire query is imputed into a single function ready for execution.  Then, in the same stage, there are two WholeStageCodegen steps wherein physical operators are condensed into a single Java function.  From there, a SortMergeJoin is done to join back the relevant data toether.  At this point, mapPartitionsInternal is used in the completed Stage 81 to filter the data.  Finally, there is a map operation in this stage of the job.  

### Check the storage level of your dataframe. What happens if you checkpoint the data?

If you checkpoint the data, for example with reliable checkpointing, the current state of the RDD is saved in a distributed file system like the Hadoop Distributed File System.  Further, the storage level of my dataframe goes down when I checkpoint the data.  