## Step 1 from Assignment
1. Load the global weather data into your big data technology of choice.
2. Join the stationlist.csv with the countrylist.csv to get the full country name
for each station number.
3. Join the global weather data with the full country names by station number.

In [1]:
#import pyspark spark session
from pyspark.sql import SparkSession



In [2]:
#create spark session
#set app name to the assigment challenge
spark = SparkSession.\
        builder.\
        appName("de-weather-challenge").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()

In [3]:
# step 1.1 load data
station_df = spark.read.csv(path="../../data/stationlist.csv", sep=",", header=True)
station_df.count()

25306

In [4]:
station_df.show(10)

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
|033450|          UK|
|039290|          UK|
|039790|          EI|
|040480|          IC|
|041300|          IC|
+------+------------+
only showing top 10 rows



In [5]:
# get distinct country abbreviations
station_df.select("COUNTRY_ABBR").distinct().count()

251

In [6]:
country_df = spark.read.csv(path="../../data/countrylist.csv", sep=",", header=True)
country_df.show(10)

+------------+-------------------+
|COUNTRY_ABBR|       COUNTRY_FULL|
+------------+-------------------+
|          AA|              ARUBA|
|          AC|ANTIGUA AND BARBUDA|
|          AF|        AFGHANISTAN|
|          AG|            ALGERIA|
|          AI|   ASCENSION ISLAND|
|          AJ|         AZERBAIJAN|
|          AL|            ALBANIA|
|          AM|            ARMENIA|
|          AN|            ANDORRA|
|          AO|             ANGOLA|
+------------+-------------------+
only showing top 10 rows



In [7]:
country_df.count()

288

Finding: there are 288 countries in the countrylist.csv but only 251 distinct country abbreviations in stationlist.csv so there are missing countries

Assumption: I will use an inner join to join country_df and station_df since all the questions require weather data from stations. If a country does not have a station, then I am assuming it does not have any weather data.

In [8]:
weather_df = spark.read.csv(path="../../data/data/2019/*", sep=",", header=True)
weather_df.show()

+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|  MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+-----+-----+-----+-----+------+
|010260|99999|20190101|26.1|21.2|1001.9| 987.5| 20.6| 9.0| 15.9| 29.7| 29.8|21.7*|0.02G| 18.5|001000|
|010260|99999|20190102|24.9|22.1|1020.1|1005.5|  5.4| 5.6| 13.6| 22.1|27.1*| 20.7|0.48G| 22.8|001000|
|010260|99999|20190103|31.7|29.1|1008.9| 994.7| 13.6|11.6| 21.4| 49.5|37.4*|26.8*|0.25G|999.9|011000|
|010260|99999|20190104|32.9|30.3|1011.4| 997.1| 15.8| 4.9|  7.8| 10.9| 36.1| 31.8|0.52G|999.9|001000|
|010260|99999|20190105|35.5|33.0|1015.7|1001.4| 12.0|10.4| 13.6| 21.0|38.5*| 32.7|0.02G| 23.6|010000|
|010260|99999|20190106|38.5|34.1|1008.2| 994.2| 12.8|10.0| 17.5| 28.9| 41.4|33.8*|0.12G| 23.2|010000|
|010260|99999|20190107|32.1|29.8| 996.8| 982.7|  6.9|11.3| 15.5| 28.6|35.1*| 30.4|

In [9]:
# clean up weather_df STN_NO name to match other dataframes
weather_df = weather_df.\
               withColumnRenamed("STN---", 'STN_NO')


In [10]:
#step 1.2 join country and station list

In [11]:
country_station_df = country_df.join(station_df, country_df.COUNTRY_ABBR == station_df.COUNTRY_ABBR)

In [13]:
country_station_df.cache()
country_station_df.count()

25209

Finding: joining station and country returns 25209 records but station contained 25306. This join is duplicating records.

Solution: I will need to check for duplicate records in country or station

## Investigating and Mitigating duplication in country and station join

In [14]:
country_df.select("COUNTRY_ABBR").distinct().count()

288

In [15]:
station_df.select('STN_NO').distinct().count()

25297

Finding: there are multiple station numbers in station_df based on the distinct count from above.

In [16]:
# find duplicate records in station_df by creating a temp view and executing spark sql
station_df.createTempView("station")
dup_stations = spark.sql(
    '''select STN_NO, count(STN_NO)
    from station
    group by STN_NO
    having count(STN_NO)>1'''
)

In [17]:
# join station_df on duplicate stations and order by station id, then join on country_df to see why there are duplicates
station_df.\
join(dup_stations, station_df.STN_NO == dup_stations.STN_NO).\
orderBy(station_df.STN_NO).\
join(country_df, country_df.COUNTRY_ABBR==station_df.COUNTRY_ABBR)\
.show(20)

+------+------------+------+-------------+------------+--------------------+
|STN_NO|COUNTRY_ABBR|STN_NO|count(STN_NO)|COUNTRY_ABBR|        COUNTRY_FULL|
+------+------------+------+-------------+------------+--------------------+
|785140|          US|785140|            2|          US|       UNITED STATES|
|785140|          RQ|785140|            2|          RQ|         PUERTO RICO|
|785145|          RQ|785145|            2|          RQ|         PUERTO RICO|
|785145|          US|785145|            2|          US|       UNITED STATES|
|785265|          US|785265|            2|          US|       UNITED STATES|
|785265|          RQ|785265|            2|          RQ|         PUERTO RICO|
|785510|          VQ|785510|            2|          VQ|VIRGIN ISLANDS (U...|
|785510|          US|785510|            2|          US|       UNITED STATES|
|788660|          NT|788660|            2|          NT|NETHERLANDS ANTILLES|
|788730|          NT|788730|            2|          NT|NETHERLANDS ANTILLES|

Finding: the duplicates are because of countries that merged with other countries. Example Puerto Rico is a district of the US.

Fix: there is no easy way to know as of 2019 what is the correct country. For the ease of this assignment I am going to groupby station number and select last assuming the order of the table has the newest records at the bottom. This may not hold true but it will work for this assigment. The appropraite method would be to create some kind of override table and transformation to correct for this once the we know which country the station should be associated with in 2019.

In [18]:
# implement fix mentioned above
stations_corrected_df = spark.sql(
    '''select STN_NO, last(COUNTRY_ABBR) as COUNTRY_ABBR
    from station
    group by STN_NO
    '''
)

In [19]:
stations_corrected_df.count()

25297

In [20]:
# double check for duplicates on the corrected data frame (also this appears to not be deterministic with the order of the countries, 
# ie US and PUERTO RICO are selected by it should be one 
# or the other)
stations_corrected_df.\
join(dup_stations, stations_corrected_df.STN_NO == dup_stations.STN_NO).\
orderBy(stations_corrected_df.STN_NO).\
join(country_df, country_df.COUNTRY_ABBR==stations_corrected_df.COUNTRY_ABBR)\
.show(20)

+------+------------+------+-------------+------------+--------------------+
|STN_NO|COUNTRY_ABBR|STN_NO|count(STN_NO)|COUNTRY_ABBR|        COUNTRY_FULL|
+------+------------+------+-------------+------------+--------------------+
|785140|          RQ|785140|            2|          RQ|         PUERTO RICO|
|785145|          US|785145|            2|          US|       UNITED STATES|
|785265|          RQ|785265|            2|          RQ|         PUERTO RICO|
|785510|          US|785510|            2|          US|       UNITED STATES|
|788660|          NT|788660|            2|          NT|NETHERLANDS ANTILLES|
|788730|          NL|788730|            2|          NL|         NETHERLANDS|
|789900|          NT|789900|            2|          NT|NETHERLANDS ANTILLES|
|917920|          FJ|917920|            2|          FJ|                FIJI|
+------+------------+------+-------------+------------+--------------------+



In [21]:
# redo step 1.2 with corrected data
country_station_df = country_df.join(stations_corrected_df, country_df.COUNTRY_ABBR == stations_corrected_df.COUNTRY_ABBR, how = 'inner')
country_station_df.cache()
country_station_df.count()

25201

In [22]:
country_station_df.show()

+------------+--------------+------+------------+
|COUNTRY_ABBR|  COUNTRY_FULL|STN_NO|COUNTRY_ABBR|
+------------+--------------+------+------------+
|          NO|        NORWAY|010875|          NO|
|          NO|        NORWAY|011350|          NO|
|          NO|        NORWAY|012960|          NO|
|          NO|        NORWAY|013840|          NO|
|          SW|        SWEDEN|020900|          SW|
|          SW|        SWEDEN|021310|          SW|
|          SW|        SWEDEN|021850|          SW|
|          SW|        SWEDEN|025080|          SW|
|          FI|       FINLAND|029810|          FI|
|          UK|UNITED KINGDOM|030064|          UK|
|          UK|UNITED KINGDOM|030540|          UK|
|          UK|UNITED KINGDOM|030920|          UK|
|          UK|UNITED KINGDOM|032570|          UK|
|          UK|UNITED KINGDOM|032800|          UK|
|          UK|UNITED KINGDOM|033735|          UK|
|          UK|UNITED KINGDOM|033920|          UK|
|          UK|UNITED KINGDOM|038880|          UK|


In [28]:
# step 1.3
master_df = country_station_df.join(weather_df, "STN_NO")
#master_df.cache()

In [29]:
master_df.printSchema()

root
 |-- STN_NO: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- YEARMODA: string (nullable = true)
 |-- TEMP: string (nullable = true)
 |-- DEWP: string (nullable = true)
 |-- SLP: string (nullable = true)
 |-- STP: string (nullable = true)
 |-- VISIB: string (nullable = true)
 |-- WDSP: string (nullable = true)
 |-- MXSPD: string (nullable = true)
 |-- GUST: string (nullable = true)
 |-- MAX: string (nullable = true)
 |-- MIN: string (nullable = true)
 |-- PRCP: string (nullable = true)
 |-- SNDP: string (nullable = true)
 |-- FRSHTT: string (nullable = true)



# Step 2
I couldn't make it further than this

In [34]:
#step 2.1
master_df.createOrReplaceTempView("master_data")
mean_temp = spark.sql(
    '''select COUNTRY_FULL, mean(cast(TEMP)) as COUNTRY_ABBR
    from master_data
    group by STN_NO
    order by mean(COUNTRY_ABBR) desc
    '''
)

Py4JJavaError: An error occurred while calling o27.sql.
: java.lang.UnsupportedOperationException: empty.init
	at scala.collection.TraversableLike$class.init(TraversableLike.scala:451)
	at scala.collection.mutable.ArrayOps$ofInt.scala$collection$IndexedSeqOptimized$$super$init(ArrayOps.scala:234)
	at scala.collection.IndexedSeqOptimized$class.init(IndexedSeqOptimized.scala:135)
	at scala.collection.mutable.ArrayOps$ofInt.init(ArrayOps.scala:234)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:565)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7$$anonfun$11.apply(FunctionRegistry.scala:558)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7.apply(FunctionRegistry.scala:558)
	at org.apache.spark.sql.catalyst.analysis.FunctionRegistry$$anonfun$7.apply(FunctionRegistry.scala:545)
	at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:115)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1278)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5$$anonfun$applyOrElse$52.apply(Analyzer.scala:1328)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5$$anonfun$applyOrElse$52.apply(Analyzer.scala:1328)
	at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:1327)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:1311)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:259)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:258)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
	at org.apache.spark.sql.catalyst.trees.TreeNode.org$apache$spark$sql$catalyst$trees$TreeNode$$mapChild$2(TreeNode.scala:298)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$13.apply(TreeNode.scala:357)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:357)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:264)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:264)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:83)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:83)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:116)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$2.apply(QueryPlan.scala:121)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.immutable.List.map(List.scala:296)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:121)
	at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:126)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:83)
	at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:74)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1311)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16.applyOrElse(Analyzer.scala:1309)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$apply$1.apply(AnalysisHelper.scala:90)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:89)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperatorsUp(AnalysisHelper.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$1.apply(AnalysisHelper.scala:87)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1$$anonfun$1.apply(AnalysisHelper.scala:87)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:329)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:87)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$$anonfun$resolveOperatorsUp$1.apply(AnalysisHelper.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$class.resolveOperatorsUp(AnalysisHelper.scala:86)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1309)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:1308)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:87)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:84)
	at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
	at scala.collection.immutable.List.foldLeft(List.scala:84)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:84)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:76)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:127)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:121)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:106)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$executeAndCheck$1.apply(Analyzer.scala:105)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:78)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
# step 2.2


In [None]:
# step 2.3