Showcasing the process of how Mapper and Reducer functions work in Hadoop - Matthew Kondrak

Goal is to find the maximum temperature for each year in provided txt file

In [1]:
import pandas as pd
import warnings
warnings.simplefilter(action='ignore',category=FutureWarning)

Step 1: Data Cleaning / Preparing the data

In [2]:
#inputing the txt file
temp=pd.read_csv('/Users/matthewkondrak/desktop/temperatures.txt',header = None)
print(temp)

           0      1         2      3
0    (201803    95)   (201708    61)
1    (202002    88)   (201501    88)
2    (202004    76)   (201602    35)
3    (201012   113)   (201012   115)
4    (201009   114)   (201708   114)
..       ...    ...       ...    ...
995  (201906    97)   (201209    81)
996  (201001    74)   (201111    98)
997  (201209   115)   (201105    48)
998  (201009   105)   (201004    41)
999  (201409   107)   (201912   113)

[1000 rows x 4 columns]


In [3]:
#seperating the data into four columns
temp1=temp[0]
temp2=temp[1]
temp3=temp[2]
temp4=temp[3]

#removing punctuations and converting string
temp1=temp1.str.replace("(","")
column1=pd.DataFrame(temp1)
temp2=temp2.str.replace(")","")
column2=pd.DataFrame(temp2)
temp3=temp3.str.replace("(","")
column3=pd.DataFrame(temp3)
temp4=temp4.str.replace(")","")
column4=pd.DataFrame(temp4)

#joining data cleaned columns back together
DataFrame=pd.concat([column1,column2,column3,column4],join='inner',axis=1)
DataFrame=DataFrame.apply(pd.to_numeric)

#labeling the columns
dict= {0:"Year 1",1:"Temperature 1",2:"Year 2",3:"Temperature 2"}
DataFrame.rename(columns=dict,inplace=True)
print(DataFrame)

     Year 1  Temperature 1  Year 2  Temperature 2
0    201803             95  201708             61
1    202002             88  201501             88
2    202004             76  201602             35
3    201012            113  201012            115
4    201009            114  201708            114
..      ...            ...     ...            ...
995  201906             97  201209             81
996  201001             74  201111             98
997  201209            115  201105             48
998  201009            105  201004             41
999  201409            107  201912            113

[1000 rows x 4 columns]


Step 2: Splitting the data

In [4]:
part1=DataFrame[:500]
part2=DataFrame[500:]

Step 3: Mapper Function 1

In [5]:
#map per each column
map1=part1["Year 1"]
map2=part1["Temperature 1"]
map3=part1["Year 2"]
map4=part1["Temperature 2"]

#applying map function to each of the four maps in part1
DataFrame=pd.DataFrame(data=list(enumerate(map1,start=1)))
DataFrame1=DataFrame.applymap(str).applymap(lambda x:"{}".format(x[:-2]))
DataFrame1=DataFrame1.drop(DataFrame.columns[[0]],axis=1)

DataFrame=pd.DataFrame(data=list(enumerate(map2,start=1)))
DataFrame2=DataFrame.drop(DataFrame.columns[[0]],axis=1)

DataFrame=pd.DataFrame(data=list(enumerate(map3,start=1)))
DataFrame3=DataFrame.applymap(str).applymap(lambda x:"{}".format(x[:-2]))
DataFrame3=DataFrame3.drop(DataFrame.columns[[0]],axis=1)

DataFrame=pd.DataFrame(data=list(enumerate(map4,start=1)))
DataFrame4=DataFrame.drop(DataFrame.columns[[0]],axis=1)

#Concatenating the two Year columns
year1=pd.concat([DataFrame1,DataFrame3])
year1=year1.rename(columns={1:"Year"})

#Concatenating the two Temp columns
dftemp1=pd.concat([DataFrame2,DataFrame4])
outputframe1=pd.DataFrame(year1)
outputframe1['Temp']=dftemp1
print(outputframe1)

     Year  Temp
0    2018    95
1    2020    88
2    2020    76
3    2010   113
4    2010   114
..    ...   ...
495  2020    39
496  2018   113
497  2017   105
498  2020    66
499  2019    88

[1000 rows x 2 columns]


Step 3: Mapper Function 2

In [6]:
#map per each column
map5=part2["Year 1"]
map6=part2["Temperature 1"]
map7=part2["Year 2"]
map8=part2["Temperature 2"]

#applying map function to each of the four maps in part2
DataFrame=pd.DataFrame(data=list(enumerate(map5,start=1)))
DataFrame5=DataFrame.applymap(str).applymap(lambda s:"{}".format(s[:-2]))
DataFrame5=DataFrame5.drop(DataFrame.columns[[0]],axis=1)

DataFrame=pd.DataFrame(data=list(enumerate(map6,start=1)))
DataFrame6=DataFrame.drop(DataFrame.columns[[0]],axis=1)

DataFrame=pd.DataFrame(data=list(enumerate(map7,start=1)))
DataFrame7=DataFrame.applymap(str).applymap(lambda s:"{}".format(s[:-2]))
DataFrame7=DataFrame7.drop(DataFrame.columns[[0]],axis=1)

DataFrame=pd.DataFrame(data=list(enumerate(map8,start=1)))
DataFrame8=DataFrame.drop(DataFrame.columns[[0]],axis=1)

#Concatenating the two Year columns
year2=pd.concat([DataFrame5,DataFrame7])
year2=year2.rename(columns={1:"Year"})

#Concatenating the two Temp columns
dftemp2=pd.concat([DataFrame6,DataFrame8])
outputframe2=pd.DataFrame(year2)
outputframe2['Temp']=dftemp2
print(outputframe2)

     Year  Temp
0    2016    39
1    2014    66
2    2015    43
3    2015    94
4    2020    37
..    ...   ...
495  2012    81
496  2011    98
497  2011    48
498  2010    41
499  2019   113

[1000 rows x 2 columns]


In [7]:
df_part1=pd.DataFrame(outputframe1)
df_part2=pd.DataFrame(outputframe2)

Step 4: Sort Function in ascending order

In [8]:
sort=pd.concat([df_part1,df_part2])
#sorting by key in ascending order
sort=sort.sort_values(by='Year',ascending=True).reset_index(drop=True)
print(sort)

      Year  Temp
0     2010   105
1     2010   119
2     2010    98
3     2010   114
4     2010    54
...    ...   ...
1995  2020    74
1996  2020   116
1997  2020   108
1998  2020   103
1999  2020    36

[2000 rows x 2 columns]


Step 5: Partition Function & Step 6: Reducer Function

In [9]:
import functools
#reducer function for partition 1
sort['Year']=pd.to_datetime(sort['Year'])
reducer1=(sort['Year']<='2015')
reducer1=pd.DataFrame(sort.loc[reducer1])
#print(reducer1)

#finding maximum temperature for partition 1
output1=reducer1.groupby(lambda x:reducer1['Year'][x].year)["Temp"].max()

#reducer function for partition 2
sort['Year']=pd.to_datetime(sort['Year'])
reducer2=(sort['Year']>='2016')
reducer2=pd.DataFrame(sort.loc[reducer2])
#print(reducer2)

#finding maximum temperature for partition 2
output2=reducer2.groupby(lambda x:reducer2['Year'][x].year)["Temp"].max()

Step 7: Main Function

In [10]:
#combining the two reducer functions
results=(pd.concat([output1,output2]))
print(results)

2010    120
2011    120
2012    120
2013    120
2014    120
2015    120
2016    120
2017    120
2018    120
2019    120
2020    120
Name: Temp, dtype: int64


Saving Final Word Count Output as csv

In [11]:
import csv
resultsdf = pd.DataFrame(results,columns=['Temp'])
resultsdf.rename(columns = {0:'year',1:'max temp'}, inplace=True)

resultsdf.to_csv(r'/Users/matthewkondrak/Desktop/output.csv', header=True)