# Code for working on the results of the map reduce function

In [1]:
import os
os.environ['SPARK_HOME'] ="/opt/cloudera/parcels/CDH-5.8.0-1.cdh5.8.0.p0.42/lib/spark"

In [2]:
import findspark
findspark.init()

In [3]:
import pyspark
conf = pyspark.SparkConf().\
    setAppName('test_app').\
    set('spark.port.maxRetries', 60).\
    set('spark.yarn.appMasterEnv.PYSPARK_PYTHON', '/home/deacuna/anaconda3/bin/python').\
    set('spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON', '/home/deacuna/anaconda3/bin/python').\
    setMaster('yarn-client').\
    set('executor.memory', '1000m').\
    set('spark.yarn.executor.memoryOverhead', '4098').\
    set('spark.sql.codegen', 'true').\
    set('spark.yarn.executor.memory', '500m').\
    set('yarn.scheduler.minimum-allocation-mb', '500m').\
    set('spark.dynamicAllocation.maxExecutors', '3').\
    set('jars', 'hdfs://eggs/graphframes-0.1.0-spark1.6.jar').\
    set('spark.driver.maxResultSize', '4g')

In [4]:
from pyspark.sql import SQLContext, HiveContext
sc = pyspark.SparkContext(conf=conf)
sqlContext = HiveContext(sc)

In [5]:
from pyspark.sql import functions as fn

# Convert the tab separated file output of mapreduce into a parquet file for faster execution

In [6]:
mapRed_rdd = sc.textFile("/user/alain/IST718/data/data.tsv") # read the tab separated file

In [7]:
mapRed_rdd = mapRed_rdd.map(lambda x: x.split('\t'))

In [8]:
mapRed_df = mapRed_rdd.toDF(['StationID', 'CountryCode', 'Year', 'Month', 'Element', 'MonthAverage'])

In [9]:
mapRed_df.write.parquet('IST718/data/mapRed.parquet') # save the parquet file

# Start working on the parquet file

In [6]:
mapRed_parquet = sqlContext.read.parquet("IST718/data/mapRed.parquet")

In [7]:
mapRed_parquet.registerTempTable("weather") # register it as a temporary SQL table

In [8]:
mapRed_parquet.columns # check column names

['StationID', 'CountryCode', 'Year', 'Month', 'Element', 'MonthAverage']

In [10]:
us_temp_df = sqlContext.sql("""
SELECT Year, Month, MonthAverage 
FROM weather 
WHERE CountryCode = \"US\" AND Element = \"TMAX\"""")

In [35]:
def season_type(df):
    m = int(df['Month'])
    spring = range(3, 7)
    summer = range(7, 10)
    fall = range(10, 13)
    if m in spring:
        season = "Spring"
    elif m in summer:
        season = "Summer"
    elif m in fall:
        season = "Fall"
    else:
        season = "Winter"
    return season

In [19]:
#us_temp_df.withColumn('season', season_type(fn.col('Month'))).show(6)
us_temp_pdf = us_temp_df.toPandas()

In [36]:
us_temp_pdf['Season'] = us_temp_pdf.apply(season_type, axis = 1)

In [38]:
us_temp_pdf.to_csv('/users/alain/IST718/USTemp.csv', index = False)

In [39]:
us_temp_pdf.take

<bound method NDFrame.take of          Year Month    MonthAverage  Season
0        2008    10           134.5    Fall
1        2008    11   54.3333333333    Fall
2        2008    12  -47.9677419355    Fall
3        2009    01  -56.6774193548  Winter
4        2009    02   1.21428571429  Winter
5        2009    03            59.7  Spring
6        2009    04           132.3  Spring
7        2009    05   214.225806452  Spring
8        2009    06   239.166666667  Spring
9        2009    07   253.161290323  Summer
10       2009    08   250.161290323  Summer
11       2009    09   227.689655172  Summer
12       2009    10   77.7741935484    Fall
13       2009    11           109.6    Fall
14       2009    12   -59.935483871    Fall
15       2010    01  -83.2580645161  Winter
16       2010    02  -54.1071428571  Winter
17       2010    03   72.8387096774  Spring
18       2010    04   182.344827586  Spring
19       2010    05   203.387096774  Spring
20       2010    06   253.392857143  Spring
21

In [12]:
# Check the year with the most countries records
sqlContext.sql("select Year, COUNT(DISTINCT CountryCode) AS count FROM weather GROUP BY Year ORDER BY count DESC").show(200)

+----+-----+
|Year|count|
+----+-----+
|1982|  206|
|1984|  206|
|1985|  206|
|1986|  206|
|1988|  206|
|1989|  206|
|1990|  205|
|1981|  205|
|1983|  205|
|1987|  205|
|1991|  204|
|1993|  204|
|1994|  204|
|1979|  204|
|1980|  203|
|1992|  202|
|1995|  202|
|1974|  202|
|1975|  202|
|1977|  202|
|2011|  202|
|2012|  202|
|2014|  202|
|1973|  201|
|2005|  201|
|1978|  201|
|2010|  201|
|2013|  201|
|2004|  200|
|2006|  200|
|2008|  200|
|2009|  200|
|1976|  200|
|2015|  200|
|1997|  199|
|1996|  199|
|2007|  199|
|2001|  198|
|2003|  198|
|2016|  198|
|1998|  197|
|2002|  196|
|2000|  195|
|1999|  194|
|1957|  160|
|1960|  159|
|1961|  159|
|1958|  158|
|1959|  158|
|1962|  157|
|1963|  154|
|1964|  153|
|1956|  152|
|1965|  152|
|1955|  150|
|1967|  148|
|1966|  144|
|1954|  142|
|1968|  142|
|1953|  140|
|1969|  136|
|1972|  136|
|1971|  136|
|1951|  136|
|1952|  136|
|1970|  134|
|1945|  128|
|1950|  127|
|1949|  122|
|1946|  120|
|1944|  118|
|1947|  112|
|1948|  110|
|1943|  108|

In [18]:
t.show(5)

+-----------+----+-------+------------------+
|CountryCode|Year|Element|               _c3|
+-----------+----+-------+------------------+
|         IN|1937|   PRCP|34.381267549292794|
|         IR|1961|   TMAX|230.86387395459022|
|         IS|2008|   TMAX| 264.3117430971986|
|         IS|2013|   TMIN|157.70138864770337|
|         IT|2014|   SNWD|451.54899889787794|
|         IT|1818|   PRCP|14.750377624171668|
|         IT|1837|   TMIN| 89.64576612910248|
|         IT|1966|   WSFG| 84.45246272476524|
|         IT|1968|   SNOW|               0.0|
|         IT|1961|   WDFG|              null|
+-----------+----+-------+------------------+
only showing top 10 rows



In [8]:
countriesISO_rdd = sc.textFile("/user/alain/IST718/data/ghcnd-stations_ISO.csv").map(lambda line: line.split(","))
countriesISO_df = countriesISO_rdd.toDF(['CountryCode', 'ISO2', 'ISO3']) # Country code is the pseudo ISO2 codes

In [9]:
# the latest year with most countries data is 2014.
currentTemp_df = sqlContext.sql("""
SELECT CountryCode, AVG(MonthAverage) AS Value
FROM weather
WHERE Year = 2014
GROUP BY CountryCode
""")

In [10]:
# join with the countries ISO codes to create a map
currentTemp_df = currentTemp_df.join(countriesISO_df, "CountryCode") 

In [22]:
currentTemp_df.show()

+-----------+------------------+----+----+
|CountryCode|             Value|ISO2|ISO3|
+-----------+------------------+----+----+
|         MT|136.05849549992467|  MT| MLT|
|         GG|133.64580390438186|  GE| GEO|
|         MU|209.97831977056083|  OM| OMN|
|         TD|200.71485143098442|  TT| TTO|
|         GH|210.86333270189337|  GH| GHA|
|         MV|191.63336965820798|  MV| MDV|
|         TE|   213.51147800335|  TF| ATF|
|         GI|131.47983469442053|  GI| GIB|
|         MX|158.97488953947175|  MX| MEX|
|         MY|176.95553756564712|  MY| MYS|
|         TH|218.82653060415026|  TH| THA|
|         GL|-4.202453541350763|  GL| GRL|
|         MZ|179.08647911844764|  MZ| MOZ|
|         TI| 139.2794394394776|  UZ| UZB|
|         GM| 63.89786796652545|  DE| DEU|
|         GP|187.95435638797503|  FR| FRA|
|         GQ| 86.55699974681225|  GU| GUM|
|         GR| 88.53482818030292|  GR| GRC|
|         TO|212.78754437972705|  GH| GHA|
|         AE| 230.3588800630487|  AE| ARE|
+----------

In [14]:
# select data from 100 years ago from 2014 (i.e., 1914).
oldTemp_df = sqlContext.sql("""
SELECT CountryCode, AVG(MonthAverage) AS OldValue
FROM weather
WHERE Year = 1914
GROUP BY CountryCode
""")

In [15]:
# join with the countries ISO codes to create a map
oldTemp_df = oldTemp_df.join(countriesISO_df, "CountryCode") 

In [16]:
# join with the countries ISO codes to create a map
oldTemp_df = oldTemp_df.join(currentTemp_df, "CountryCode") 

In [21]:
oldTemp_df.show(5)

+-----------+-------------------+----+----+------------------+----+----+
|CountryCode|           OldValue|ISO2|ISO3|             Value|ISO2|ISO3|
+-----------+-------------------+----+----+------------------+----+----+
|         GG| 103.55570677776406|  GE| GEO|133.64580390438186|  GE| GEO|
|         MX|  87.72729039850554|  MX| MEX|158.97488953947175|  MX| MEX|
|         GL|-25.457227129197207|  GL| GRL|-4.202453541350763|  GL| GRL|
|         TI| 56.103212492849096|  UZ| UZB| 139.2794394394776|  UZ| UZB|
|         GM|  74.48523994505423|  DE| DEU| 63.89786796652545|  DE| DEU|
+-----------+-------------------+----+----+------------------+----+----+
only showing top 5 rows



In [39]:
oldTemp_pdf = oldTemp_df.toPandas()

In [40]:
oldTemp_pdf['TempDiff'] = oldTemp_pdf.Value - oldTemp_pdf.OldValue

In [41]:
oldTemp_pdf.drop(oldTemp_pdf.columns[[1, 4]], axis = 1, inplace = True)

In [42]:
oldTemp_pdf

Unnamed: 0,CountryCode,ISO2,ISO3,ISO2.1,ISO3.1,TempDiff
0,GG,GE,GEO,GE,GEO,30.090097
1,MX,MX,MEX,MX,MEX,71.247599
2,GL,GL,GRL,GL,GRL,21.254774
3,TI,UZ,UZB,UZ,UZB,83.176227
4,GM,DE,DEU,DE,DEU,-10.587372
5,AG,DZ,DZA,DZ,DZA,15.521613
6,TS,TN,TUN,TN,TUN,3.713935
7,AJ,AZ,AZE,AZ,AZE,119.161460
8,NG,NE,NER,NE,NER,-8.404112
9,AM,AM,ARM,AM,ARM,22.860621


In [23]:
currentTemp_df.toPandas().to_csv('/users/alain/IST718/currentTemp.csv', index = False) # save the results

In [43]:
oldTemp_pdf.to_csv('/users/alain/IST718/TempDiff.csv', index = False) # save the results