# Step 0: Create spark context

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
spark

In [3]:
sc

## Step 1: Get data with which we are familiar



### Dataset: Green practices across DC
[documentation](https://opendata.dc.gov/datasets/best-management-practices/data)
![rainbarrel](https://cdn11.bigcommerce.com/s-j602wc6a/products/7096/images/21615/great-american-rainbarrel-trio__96479.1502809069.500.750.jpg?c=2)

We want to know:
- what are the most common BMP_Type by neighborhood? 
- How many instances are on private vs public land?

### Step 2: Create a pandas dataframe

In [12]:
import pandas as pd
pandaDf = pd.read_csv('https://opendata.arcgis.com/datasets/a973c2c7b7c14a918859f3e38bdffdd2_42.csv')

  interactivity=interactivity, compiler=compiler, result=result)


In [14]:
pandaDf.head()

Unnamed: 0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
0,-77.036773,38.958677,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-14T00:00:00.000Z,,0,...,#R15760-1-2,,,,MS4,True,True,4.0,No,Yes
1,-76.938687,38.888273,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-20T00:00:00.000Z,,0,...,#R11333-1-12,,,,MS4,True,True,7.0,No,Yes
2,-76.989573,38.928465,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-07T00:00:00.000Z,,220,...,#R12966-1-2,,,,CSS,True,True,5.0,No,Yes
3,-77.031645,38.954937,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-09-01T00:00:00.000Z,,0,...,#R13157-1-6,,,,CSS,True,True,4.0,No,Yes
4,-77.0079,38.8276,4719-1-5,Infiltration trench,,False,Infiltration,2018-06-21T00:00:00.000Z,,9,...,#4719-1-5,4719.0,,2016-06-21T00:00:00.000Z,MS4,True,True,8.0,No,No


In [19]:
pandaDf.groupby(["WARD", "BMP_TYPE"]).BMP_TYPE.count()

WARD  BMP_TYPE                                   
1.0   Bayscaping                                      89
      Bioretention                                    26
      Dry swale                                        1
      Engineered treepits                              1
      Extensive green roof                            32
      Filtering System                                 2
      Filtering Systems                               48
      Green Roof                                       1
      Green roof                                      45
      Infiltration                                    23
      Infiltration basin                               1
      Infiltration trench                             13
      Intensive green roof                            46
      Permeable pavement                              42
      Permeable pavers                                19
      Pervious concrete                                8
      Porous asphalt                  

In [22]:
pd.crosstab(pandaDf.WARD, pandaDf.BMP_TYPE)

BMP_TYPE,Bayscaping,Bioretention,CDA to a Shared BMP,Dry swale,Engineered tree pits,Engineered treepits,Extensive green roof,Filtering System,Filtering Systems,Grass channel,...,Streetscape bioretention,Surface sand filter,Three chamber underground sand filter,Traditional bioretention,Tree planting,Tree preservation,Underground vault,Wet Swale,Wet pond,Wetlands
WARD,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
1.0,89,26,0,1,0,1,32,2,48,0,...,2,0,0,30,18,6,0,3,0,0
2.0,8,34,0,2,0,6,180,14,105,1,...,14,0,0,91,18,8,2,0,0,0
3.0,153,91,2,4,0,5,28,8,49,12,...,0,0,0,16,111,53,0,0,0,1
4.0,393,139,0,16,0,2,21,1,22,1,...,5,0,0,29,141,20,0,0,0,0
5.0,307,161,1,18,0,3,49,3,54,4,...,1,1,0,70,218,10,0,0,0,2
6.0,102,147,1,16,2,117,93,8,102,8,...,42,5,1,64,91,23,5,2,0,1
7.0,229,95,0,10,0,2,7,1,54,10,...,1,0,0,70,39,4,0,0,0,0
8.0,108,113,1,14,0,0,14,3,32,6,...,1,0,3,174,67,11,2,0,1,0


### Step 3: Create spark dataframe

In [30]:
myrdd = sc.textFile('https://opendata.arcgis.com/datasets/a973c2c7b7c14a918859f3e38bdffdd2_42.csv').map(lambda line: line.split(","))
# need to make this a df

In [27]:
type(myrdd)

pyspark.rdd.PipelinedRDD

In [41]:
# import spark.implicits._
# df=myrdd.rdd
# df=spark.createDataFrame(myrdd)
sparkDf = spark.read.csv('Best_Management_Practices.csv', encoding=("UTF-8"))

In [42]:
sparkDf.head()

Row(_c0='X', _c1='Y', _c2='BMP_ID_NUMBER', _c3='BMP_TYPE', _c4='BMP_SUB_TYPE', _c5='UNDERDRAIN', _c6='BMP_GROUP', _c7='INSTALLATION_DATE', _c8='DATE_REMOVED', _c9='CONTRIBUTING_DRAINAGE_AREA_FT2', _c10='POST_PROJECT_NATURAL_FT2', _c11='POST_PROJECT_COMPACTED_FT2', _c12='POST_PROJECT_IMPERVIOUS_FT2', _c13='POST_PROJECT_BMP_AREA_FT2', _c14='POST_PROJECT_VEHICULAR_FT2', _c15='PRE_PROJECT_NATURAL_FT2', _c16='PRE_PROJECT_COMPACTED_FT2', _c17='PRE_PROJECT_IMPERVIOUS_FT2', _c18='PRE_PROJECT_BMP_AREA_FT2', _c19='PRE_PROJECT_VEHICULAR_FT2', _c20='STORAGE_VOLUME_FT3', _c21='RETENTION_VOLUME_FT3', _c22='ADDITIONAL_VOLUME_TREATED_FT3', _c23='X_COORDINATE', _c24='Y_COORDINATE', _c25='MAJOR_DRAINAGE_BASIN', _c26='MINOR_DRAINAGE_BASIN', _c27='NUMBER_OF_TREES', _c28='PROPRIETARY_PRACTICE_NAME', _c29='DESCRIBE_PROPRIETARY_PRACTICE', _c30='PROJECT_TYPE', _c31='GIS_LAST_MOD_DTTM', _c32='LATITUDE', _c33='LONGITUDE', _c34='OBJECTID', _c35='BMP_GROUP_ABBREVIATION', _c36='MAJOR_REGULATED_ACTIVITY', _c37='SRC

In [43]:
sparkDf.persist()

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string, _c14: string, _c15: string, _c16: string, _c17: string, _c18: string, _c19: string, _c20: string, _c21: string, _c22: string, _c23: string, _c24: string, _c25: string, _c26: string, _c27: string, _c28: string, _c29: string, _c30: string, _c31: string, _c32: string, _c33: string, _c34: string, _c35: string, _c36: string, _c37: string, _c38: string, _c39: string, _c40: string, _c41: string, _c42: string, _c43: string, _c44: string, _c45: string, _c46: string, _c47: string, _c48: string, _c49: string, _c50: string]

In [46]:
sparkDf.limit(5).toPandas()
#df.head()
# df.show(5)

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c41,_c42,_c43,_c44,_c45,_c46,_c47,_c48,_c49,_c50
0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
1,-77.036773007625584,38.958676906847835,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-14T00:00:00.000Z,,0,...,#R15760-1-2,,,,MS4,True,True,4,No,Yes
2,-76.938687201532446,38.888273056016203,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-20T00:00:00.000Z,,0,...,#R11333-1-12,,,,MS4,True,True,7,No,Yes
3,-76.989573176813423,38.928464736384932,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-07T00:00:00.000Z,,220,...,#R12966-1-2,,,,CSS,True,True,5,No,Yes
4,-77.031644691361862,38.95493673453641,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-09-01T00:00:00.000Z,,0,...,#R13157-1-6,,,,CSS,True,True,4,No,Yes


In [None]:
# Select columns using DF format

In [47]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [48]:
show(sparkDf, 7)

Unnamed: 0,_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,...,_c41,_c42,_c43,_c44,_c45,_c46,_c47,_c48,_c49,_c50
0,X,Y,BMP_ID_NUMBER,BMP_TYPE,BMP_SUB_TYPE,UNDERDRAIN,BMP_GROUP,INSTALLATION_DATE,DATE_REMOVED,CONTRIBUTING_DRAINAGE_AREA_FT2,...,BMP_NUMBER,PLAN_NUMBER,BMP_TYPE_ACCESS,DATE_APPROVED,SEWERSHED,INCLUDE_BUILT,INCLUDE_ALL,WARD,GREEN_ROOF_REBATE,RIVERSMART_HOMES
1,-77.036773007625584,38.958676906847835,R15760-1-2,Shade Tree,,False,Tree Planting and Preservation,2014-05-14T00:00:00.000Z,,0,...,#R15760-1-2,,,,MS4,True,True,4,No,Yes
2,-76.938687201532446,38.888273056016203,R11333-1-12,Shade Tree,,False,Tree Planting and Preservation,2017-03-20T00:00:00.000Z,,0,...,#R11333-1-12,,,,MS4,True,True,7,No,Yes
3,-76.989573176813423,38.928464736384932,R12966-1-2,Rain Barrel,,False,Rainwater Harvesting,2012-06-07T00:00:00.000Z,,220,...,#R12966-1-2,,,,CSS,True,True,5,No,Yes
4,-77.031644691361862,38.95493673453641,R13157-1-6,Shade Tree,,False,Tree Planting and Preservation,2012-09-01T00:00:00.000Z,,0,...,#R13157-1-6,,,,CSS,True,True,4,No,Yes
5,-77.007900000033715,38.827600000206786,4719-1-5,Infiltration trench,,False,Infiltration,2018-06-21T00:00:00.000Z,,9,...,#4719-1-5,4719,,2016-06-21T00:00:00.000Z,MS4,True,True,8,No,No
6,-77.088417131287017,38.939089213908822,R1346-1-12,Simple disconnection to amended soils,,False,Impervious Surface Disconnection,2013-07-01T00:00:00.000Z,,7133,...,#R1346-1-12,,,,MS4,True,True,3,No,No


In [61]:
# subsample=sparkDf["BMP_TYPE"]
# # show(subsample, n=10)

In [62]:
import pyspark.sql.functions as F
counts = sparkDf.agg(F.countDistinct('BMP_TYPE'))

AnalysisException: "cannot resolve '`BMP_TYPE`' given input columns: [_c24, _c33, _c11, _c36, _c48, _c46, _c25, _c17, _c19, _c4, _c31, _c30, _c12, _c41, _c9, _c29, _c5, _c50, _c1, _c47, _c26, _c39, _c35, _c23, _c20, _c2, _c44, _c37, _c15, _c45, _c0, _c28, _c10, _c42, _c8, _c34, _c3, _c43, _c7, _c40, _c38, _c6, _c22, _c14, _c18, _c13, _c49, _c16, _c27, _c32, _c21];;\n'Aggregate [count(distinct 'BMP_TYPE) AS count(DISTINCT BMP_TYPE)#2123]\n+- Relation[_c0#173,_c1#174,_c2#175,_c3#176,_c4#177,_c5#178,_c6#179,_c7#180,_c8#181,_c9#182,_c10#183,_c11#184,_c12#185,_c13#186,_c14#187,_c15#188,_c16#189,_c17#190,_c18#191,_c19#192,_c20#193,_c21#194,_c22#195,_c23#196,... 27 more fields] csv\n"

In [64]:
# Change code to relevant names and variables

import pyspark.sql.functions as F
counts = sparkDf.agg(F.countDistinct('BMP_TYPE'))
query = """
SELECT BMP_TYPE, WARD, COUNT(*)
FROM water
GROUP BY BMP_TYPE, WARD
ORDER BY BMP_TYPE, WARD
"""
reviews_df.createOrReplaceTempView('water')
output = spark.sql(query)
show(output, n=1000)

AnalysisException: "cannot resolve '`BMP_TYPE`' given input columns: [_c24, _c33, _c11, _c36, _c48, _c46, _c25, _c17, _c19, _c4, _c31, _c30, _c12, _c41, _c9, _c29, _c5, _c50, _c1, _c47, _c26, _c39, _c35, _c23, _c20, _c2, _c44, _c37, _c15, _c45, _c0, _c28, _c10, _c42, _c8, _c34, _c3, _c43, _c7, _c40, _c38, _c6, _c22, _c14, _c18, _c13, _c49, _c16, _c27, _c32, _c21];;\n'Aggregate [count(distinct 'BMP_TYPE) AS count(DISTINCT BMP_TYPE)#2229]\n+- Relation[_c0#173,_c1#174,_c2#175,_c3#176,_c4#177,_c5#178,_c6#179,_c7#180,_c8#181,_c9#182,_c10#183,_c11#184,_c12#185,_c13#186,_c14#187,_c15#188,_c16#189,_c17#190,_c18#191,_c19#192,_c20#193,_c21#194,_c22#195,_c23#196,... 27 more fields] csv\n"

## Step 4: Practice with some more complex data
Documentation for data can be found [here](http://jmcauley.ucsd.edu/data/amazon/)

In [None]:
# Download data (run this only once)
#!wget http://snap.stanford.edu/data/amazon/productGraph/categoryFiles/reviews_Toys_and_Games_5.json.gz
#!gunzip reviews_Toys_and_Games_5.json.gz