In [0]:
print(dbutils.fs.ls("/FileStore/"))

[FileInfo(path='dbfs:/FileStore/789700_11634_2010.optemp/', name='789700_11634_2010.optemp/', size=0), FileInfo(path='dbfs:/FileStore/865800_99999_2015.optemp/', name='865800_99999_2015.optemp/', size=0), FileInfo(path='dbfs:/FileStore/868240_99999_2017.optemp/', name='868240_99999_2017.optemp/', size=0), FileInfo(path='dbfs:/FileStore/946080_99999_2012.optemp/', name='946080_99999_2012.optemp/', size=0), FileInfo(path='dbfs:/FileStore/959370_99999_2011.optemp/', name='959370_99999_2011.optemp/', size=0), FileInfo(path='dbfs:/FileStore/998172_99999_2018.optemp/', name='998172_99999_2018.optemp/', size=0), FileInfo(path='dbfs:/FileStore/998246_99999_2013.optemp/', name='998246_99999_2013.optemp/', size=0), FileInfo(path='dbfs:/FileStore/999999_03761_2016.optemp/', name='999999_03761_2016.optemp/', size=0), FileInfo(path='dbfs:/FileStore/999999_23909_2014.optemp/', name='999999_23909_2014.optemp/', size=0), FileInfo(path='dbfs:/FileStore/A07354_00132_2019.optemp/', name='A07354_00132_201

In [0]:
import os
import subprocess
from datetime import datetime as dt
from datetime import timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import size
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import from_json, col
 
spark = SparkSession.builder.master("yarn")\
        .config("spark.port.maxRetries", 100)\
        .config("spark.executor.instances", "6")\
        .config("spark.executor.cores", "4")\
        .config("spark.executor.memory", "8G")\
        .config("spark.driver.memory", "2G")\
        .config("spark.dynamicAllocation.enabled", "false")\
        .config("spark.yarn.queue", "Low")\
        .config("spark.port.maxRetries", 100)\
        .appName("New_Data_Reader_V0")\
        .getOrCreate()

In [0]:
from pyspark.sql.types import StructType, StructField
import os
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType
 
def clean_data(file_path):
    inTextData = spark.read.format("csv").option("header", "true").option("delimiter","\t").load(file_path)
    schema = StructType([
        StructField("STN", StringType()),
        StructField("WBAN", IntegerType()),
        StructField("YEARMODA", StringType()),
        StructField("TEMP", StringType()),
        StructField("DEWP", StringType()),
        StructField("SLP", StringType()),
        StructField("STP", StringType()),
        StructField("VISIB", StringType()),
        StructField("WDSP", StringType()),
        StructField("MXSPD", StringType()),
        StructField("GUST", StringType()),
        StructField("MAX", StringType()),
        StructField("MIN", StringType()),
        StructField("PRCP", StringType()),
        StructField("SNDP", StringType()),
        StructField("FRSHTT", StringType())
    ])
    inTextData.count()
    name_list = inTextData.schema.names
    name_list = str(name_list).strip("['']").split(' ')
    names = []
    for item in name_list:
        if len(item)>0:
            names.append(item)
    print(names)
    rdd1 = inTextData.rdd
    rdd2 = rdd1.map(lambda x: str(x).split('=')[1])
    rdd3 = rdd2.map(lambda x: ' '.join(x.split()))
    rdd4 = rdd3.map(lambda x: x[1:-2])
    dbutils.fs.rm(f'/tmp/{os.path.basename(file_path)}', True) 
    rdd4.saveAsTextFile(f'/tmp/{os.path.basename(file_path)}')
    newInData = spark.read.csv(f'/tmp/{os.path.basename(file_path)}',header=False,sep=' ')
    cleanData = newInData.drop('_c1','_c4','_c6','_c8','_c10','_c12','_c14')
    cleanData = cleanData.withColumnRenamed('_c0','STN').withColumnRenamed('_c2','YEARMODA')\
                        .withColumnRenamed('_c3','TEMP').withColumnRenamed('_c5','DEWP')\
                        .withColumnRenamed('_c7','SLP').withColumnRenamed('_c9','STP')\
                        .withColumnRenamed('_c11','VISIB').withColumnRenamed('_c13','WDSP')\
                        .withColumnRenamed('_c15','MXSPD').withColumnRenamed('_c16','GUST')\
                        .withColumnRenamed('_c17','MAX').withColumnRenamed('_c18','MIN')\
                        .withColumnRenamed('_c19','PRCP').withColumnRenamed('_c20','SNDP')\
                        .withColumnRenamed('_c21','FRSHTT')    
    return cleanData

In [0]:
dbutils.fs.ls("/FileStore/")

Out[12]: [FileInfo(path='dbfs:/FileStore/2010.op', name='2010.op', size=50874),
 FileInfo(path='dbfs:/FileStore/2011.op', name='2011.op', size=50874),
 FileInfo(path='dbfs:/FileStore/2012.op', name='2012.op', size=51013),
 FileInfo(path='dbfs:/FileStore/2013.op', name='2013.op', size=50735),
 FileInfo(path='dbfs:/FileStore/2014.op', name='2014.op', size=50596),
 FileInfo(path='dbfs:/FileStore/2015.op', name='2015.op', size=50874),
 FileInfo(path='dbfs:/FileStore/2016.op', name='2016.op', size=51013),
 FileInfo(path='dbfs:/FileStore/2017.op', name='2017.op', size=50735),
 FileInfo(path='dbfs:/FileStore/2018.op', name='2018.op', size=48094),
 FileInfo(path='dbfs:/FileStore/2019.op', name='2019.op', size=49345),
 FileInfo(path='dbfs:/FileStore/789700_11634_2010.optemp/', name='789700_11634_2010.optemp/', size=0),
 FileInfo(path='dbfs:/FileStore/865800_99999_2015.optemp/', name='865800_99999_2015.optemp/', size=0),
 FileInfo(path='dbfs:/FileStore/868240_99999_2017.optemp/', name='868240_99

In [0]:
data_2010 = clean_data('/FileStore/2010.op')
data_2011 = clean_data('/FileStore/2011.op')
data_2012 = clean_data('/FileStore/2012.op')
data_2013 = clean_data('/FileStore/2013.op')
data_2014 = clean_data('/FileStore/2014.op')
data_2015 = clean_data('/FileStore/2015.op')
data_2016 = clean_data('/FileStore/2016.op')
data_2017 = clean_data('/FileStore/2017.op')
data_2018 = clean_data('/FileStore/2018.op')
data_2019 = clean_data('/FileStore/2019.op')      

['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT']
['STN---', 'WBAN', 'YEARMODA', 'TEMP', 'DEWP', 'SLP', '

In [0]:
data_2010.createOrReplaceTempView('cleanData2010')
data_2011.createOrReplaceTempView('cleanData2011')
data_2012.createOrReplaceTempView('cleanData2012')
data_2013.createOrReplaceTempView('cleanData2013')
data_2014.createOrReplaceTempView('cleanData2014')
data_2015.createOrReplaceTempView('cleanData2015')
data_2016.createOrReplaceTempView('cleanData2016')
data_2017.createOrReplaceTempView('cleanData2017')
data_2018.createOrReplaceTempView('cleanData2018')
data_2019.createOrReplaceTempView('cleanData2019')

#HIGHEST AND LOWEST TEMP FROM YEAR 2010 - 2019

In [0]:
highest_lowest_temp_2010 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2010 order by TEMP desc LIMIT(1))UNION ALL (select STN, TEMP,yearmoda from cleanData2010 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2010)

STN,TEMP,yearmoda
789700,85.7,20100423
789700,75.8,20101222


In [0]:
highest_lowest_temp_2011 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2011 order by TEMP desc LIMIT(1)) UNION ALL(select STN, TEMP, yearmoda from cleanData2011 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2011)

STN,TEMP,yearmoda
959370,84.3,20110201
959370,41.3,20110707


In [0]:
highest_lowest_temp_2012 = spark.sql("select * FROM (select STN,TEMP,yearmoda from cleanData2012 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP, yearmoda from cleanData2012 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2012)

STN,TEMP,yearmoda
946080,91.8,20120125
946080,45.1,20120724


In [0]:
highest_lowest_temp_2013 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2013 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP, yearmoda from cleanData2013 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2013)

STN,TEMP,yearmoda
998246,84.5,20130804
998246,72.6,20130103


In [0]:
highest_lowest_temp_2014 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2014 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP, yearmoda from cleanData2014 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2014)

STN,TEMP,yearmoda
999999,82.9,20140823
999999,-0.1,20140106


In [0]:
highest_lowest_temp_2015 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2015 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP,yearmoda from cleanData2015 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2015)

STN,TEMP,yearmoda
865800,80.6,20151212
865800,40.5,20150619


In [0]:
highest_lowest_temp_2016 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2016 order by TEMP desc LIMIT(1)) UNION ALL(select STN, TEMP, yearmoda from cleanData2016 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2016)

STN,TEMP,yearmoda
999999,9.4,20160214
999999,15.7,20160212


In [0]:
highest_lowest_temp_2017 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2017 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP, yearmoda from cleanData2017 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2017)

STN,TEMP,yearmoda
868240,81.0,20170111
868240,54.9,20170704


In [0]:
highest_lowest_temp_2018 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2018 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP, yearmoda from cleanData2018 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2018)

STN,TEMP,yearmoda
998172,83.1,20180414
998172,24.7,20180111


In [0]:
highest_lowest_temp_2019 = spark.sql("select * FROM (select STN, TEMP, yearmoda from cleanData2019 order by TEMP desc LIMIT(1)) UNION ALL (select STN, TEMP, yearmoda from cleanData2019 order by TEMP ASC LIMIT(1))")
display(highest_lowest_temp_2019)

STN,TEMP,yearmoda
A07354,81.5,20190719
A07354,-1.5,20190201


#HIGHEST AND LOWEST TEMP FROM ALL THE YEARS COMBINED

In [0]:
df_combined = data_2019.unionAll(data_2018).unionAll(data_2017).unionAll(data_2016).unionAll(data_2015).unionAll(data_2014).unionAll(data_2013).unionAll(data_2012).unionAll(data_2011).unionAll(data_2010)

In [0]:
df_combined.createOrReplaceTempView('CombinedData')

In [0]:
highlest_lower_over_all_years = spark.sql("select * FROM (select TEMP, yearmoda from CombinedData order by TEMP desc LIMIT(1)) UNION ALL (select TEMP, yearmoda from CombinedData where stn <> '999999' order by TEMP ASC LIMIT(1))")
display(highlest_lower_over_all_years)

TEMP,yearmoda
91.8,20120125
-1.5,20190201


#MAXIMUM AND MINIMUM PRECIPITATION FOR 2015

In [0]:
#Preciptation 
highest_lowest_precipation_2015 = spark.sql("select * from ((select max(prcp) from cleanData2015 where prcp <> '99.99') UNION ALL (select min(prcp) from cleanData2015 where prcp <> '0.0'))")
display(highest_lowest_precipation_2015)

max(prcp)
4.96G
0.00A


#PERCENTAGE OF MISSING VALUES OF STP IN 2019

In [0]:
print('Missing Values of STP in 2019 data are as follows')
print(100*data_2019.where(data_2019.STP == '9999.9').count()/data_2019.count(), 'values')

Missing Values of STP in 2019 data are as follows
0.0 values


#MAXIMUM GUST IN THE YEAR 2019

In [0]:
max_gust_2019 = spark.sql("select STN, substring(YEARMODA, 5, 2) as MONTH, SUBSTRING(YEARMODA, 7,2) as DATE, SUBSTRING(YEARMODA,1,4) as YEAR, gust from cleanData2019 where gust <> 999.9 order by gust desc LIMIT(1)")
display(max_gust_2019)

STN,MONTH,DATE,YEAR,gust
A07354,6,27,2019,53.0
