In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, StructType, StructField
from pyspark.sql.functions import when, count, col, isnull

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
schema = StructType([StructField('STN---', IntegerType(), False),
                     StructField('WBAN', IntegerType(), True),
                     StructField('YEARMODA', StringType(), True),
                     StructField('TEMP', FloatType(), True),
                     StructField('DEWP', FloatType(), True),
                     StructField('SLP', FloatType(), True),
                     StructField('STP', FloatType(), True),
                     StructField('VISIB', FloatType(), True),
                     StructField('WDSP', FloatType(), True),
                     StructField('MXSPD', FloatType(), True),
                     StructField('GUST', FloatType(), True),
                     StructField('MAX', StringType(), True),
                     StructField('MIN', StringType(), True),
                     StructField('PRCP', StringType(), True),
                     StructField('SNDP', FloatType(), True),
                     StructField('FRSHTT', StringType(), True)])

In [4]:
weather = spark.read \
               .format("csv") \
               .option("header", "true") \
               .load("data/2019/*.csv", schema=schema)
weather.createOrReplaceTempView('weather')     

In [5]:
weather.show()

+------+-----+--------+----+----+------+------+-----+----+-----+-----+------+-----+-----+-----+------+
|STN---| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP|VISIB|WDSP|MXSPD| GUST|   MAX|  MIN| PRCP| SNDP|FRSHTT|
+------+-----+--------+----+----+------+------+-----+----+-----+-----+------+-----+-----+-----+------+
|958360|99999|20190101|78.8|54.9|9999.9|9999.9|999.9| 8.8| 13.0|999.9| 96.1*| 61.9|0.00G|999.9|000000|
|958360|99999|20190102|73.1|53.7|9999.9|9999.9|999.9| 9.5| 14.0|999.9| 89.2*|57.4*|0.00G|999.9|000000|
|958360|99999|20190103|79.5|47.4|9999.9|9999.9|999.9| 3.2|  8.0|999.9| 96.6*| 57.2|0.00G|999.9|000000|
|958360|99999|20190104|82.7|52.0|9999.9|9999.9|999.9|13.0| 19.0|999.9|109.8*| 60.6|0.02G|999.9|000000|
|958360|99999|20190105|61.9|47.7|9999.9|9999.9|999.9| 8.5| 15.9|999.9| 70.5*|52.3*|0.02G|999.9|010000|
|958360|99999|20190106|68.6|48.1|9999.9|9999.9|999.9| 9.2| 13.0|999.9| 79.9*| 52.0|0.00G|999.9|000000|
|958360|99999|20190107|75.3|53.3|9999.9|9999.9|999.9| 5.9|  9.9|999.9| 87

In [6]:
weather.select('WBAN').describe().show()

+-------+------------------+
|summary|              WBAN|
+-------+------------------+
|  count|           4158416|
|   mean|  86601.7904581937|
| stddev|30631.749499926314|
|    min|               102|
|    max|             99999|
+-------+------------------+



In [7]:
weather = spark.sql("""

SELECT CAST( `STN---` AS STRING ) AS STN_NO

     , CASE 
          WHEN WBAN = 99999 THEN
               NULL
          ELSE
               CAST( WBAN AS STRING )
       END AS WBAN

     , TO_DATE( YEARMODA, 'yyyyMMdd' ) AS DT_REF

     , CASE 
          WHEN TEMP * 10 = 99999 THEN
               NULL 
          ELSE 
               TEMP * 10 
       END AS TEMP

     , CASE 
          WHEN DEWP * 10 = 99999 THEN
               NULL 
          ELSE 
               DEWP * 10 
       END AS DEWP

     , CASE 
          WHEN SLP * 10 = 99999 THEN
               NULL 
          ELSE 
               SLP * 10 
       END AS SLP

     , CASE 
          WHEN STP * 10 = 99999 THEN
               NULL 
          ELSE 
               STP *10
       END AS STP

     , CASE 
          WHEN VISIB * 10 = 9999 THEN
               NULL 
          ELSE 
               VISIB * 10 
       END AS VISIB

     , CASE 
          WHEN WDSP * 10 = 9999 THEN
               NULL 
          ELSE 
               WDSP * 10 
       END AS WDSP

     , CASE 
          WHEN MXSPD * 10 = 9999 THEN
               NULL
          ELSE 
               MXSPD * 10 
       END AS MXSPD

     , CASE 
          WHEN GUST * 10 = 9999 THEN 
               NULL 
          ELSE 
               GUST * 10
       END AS GUST

     , CASE 
          WHEN MAX = '9999.9' THEN
               NULL 
          ELSE 
               CAST( REGEXP_REPLACE(MAX, '\\\Q*\\\E', '') AS REAL ) * 10 
       END AS MAX

     , CASE
          WHEN MIN = '9999.9'THEN
               NULL 
          ELSE 
               CAST( REGEXP_REPLACE(MIN, '\\\Q*\\\E', '') AS REAL ) * 10
       END AS MIN

     , CASE 
          WHEN PRCP = '99.99' OR PRCP = '0.00G' THEN 
               NULL 
          ELSE 
               CAST( REGEXP_REPLACE(PRCP, '\\\QG\\\E', '') AS REAL ) * 100
       END AS PRCP

     , CASE 
          WHEN SNDP * 10 = 9999 THEN
               NULL 
          ELSE 
               SNDP * 10
       END AS SNDP

     , CAST( SUBSTR( FRSHTT, 1, 1 ) AS INTEGER ) AS FOG

     , CAST( SUBSTR( FRSHTT, 2, 1 ) AS INTEGER ) AS RAIN

     , CAST( SUBSTR( FRSHTT, 3, 1 ) AS INTEGER ) AS SNOW

     , CAST( SUBSTR( FRSHTT, 4, 1 ) AS INTEGER ) AS HAIL

     , CAST( SUBSTR( FRSHTT, 5, 1 ) AS INTEGER ) AS THUNDER

     , CAST( SUBSTR( FRSHTT, 6, 1 ) AS INTEGER ) AS TORNADO
FROM weather
""")
weather.show()
weather.createOrReplaceTempView('weather')

+------+----+----------+-----+-----+----+----+-----+-----+-----+----+------+-----+----+----+---+----+----+----+-------+-------+
|STN_NO|WBAN|    DT_REF| TEMP| DEWP| SLP| STP|VISIB| WDSP|MXSPD|GUST|   MAX|  MIN|PRCP|SNDP|FOG|RAIN|SNOW|HAIL|THUNDER|TORNADO|
+------+----+----------+-----+-----+----+----+-----+-----+-----+----+------+-----+----+----+---+----+----+----+-------+-------+
|958360|null|2019-01-01|788.0|549.0|null|null| null| 88.0|130.0|null| 961.0|619.0|null|null|  0|   0|   0|   0|      0|      0|
|958360|null|2019-01-02|731.0|537.0|null|null| null| 95.0|140.0|null| 892.0|574.0|null|null|  0|   0|   0|   0|      0|      0|
|958360|null|2019-01-03|795.0|474.0|null|null| null| 32.0| 80.0|null| 966.0|572.0|null|null|  0|   0|   0|   0|      0|      0|
|958360|null|2019-01-04|827.0|520.0|null|null| null|130.0|190.0|null|1098.0|606.0| 2.0|null|  0|   0|   0|   0|      0|      0|
|958360|null|2019-01-05|619.0|477.0|null|null| null| 85.0|159.0|null| 705.0|523.0| 2.0|null|  0|   1|   

In [8]:
weather.printSchema()

root
 |-- STN_NO: string (nullable = true)
 |-- WBAN: string (nullable = true)
 |-- DT_REF: date (nullable = true)
 |-- TEMP: float (nullable = true)
 |-- DEWP: float (nullable = true)
 |-- SLP: float (nullable = true)
 |-- STP: float (nullable = true)
 |-- VISIB: float (nullable = true)
 |-- WDSP: float (nullable = true)
 |-- MXSPD: float (nullable = true)
 |-- GUST: float (nullable = true)
 |-- MAX: float (nullable = true)
 |-- MIN: float (nullable = true)
 |-- PRCP: float (nullable = true)
 |-- SNDP: float (nullable = true)
 |-- FOG: integer (nullable = true)
 |-- RAIN: integer (nullable = true)
 |-- SNOW: integer (nullable = true)
 |-- HAIL: integer (nullable = true)
 |-- THUNDER: integer (nullable = true)
 |-- TORNADO: integer (nullable = true)



## Nulls

In [9]:
total = weather.count()
total

4158416

In [10]:
weather.select([(100 * count(when(isnull(c), c)) / total).alias(c).cast(IntegerType()) for c in weather.columns]).show()

+------+----+------+----+----+---+---+-----+----+-----+----+---+---+----+----+---+----+----+----+-------+-------+
|STN_NO|WBAN|DT_REF|TEMP|DEWP|SLP|STP|VISIB|WDSP|MXSPD|GUST|MAX|MIN|PRCP|SNDP|FOG|RAIN|SNOW|HAIL|THUNDER|TORNADO|
+------+----+------+----+----+---+---+-----+----+-----+----+---+---+----+----+---+----+----+----+-------+-------+
|     0|  79|     0|   0|   4| 34| 30|   28|   4|    5|  73|  0|  0|  71|  93|  0|   0|   0|   0|      0|      0|
+------+----+------+----+----+---+---+-----+----+-----+----+---+---+----+----+---+----+----+----+-------+-------+



WBAN, GUST, PRCP, SNDP seem useless. But let's keep them for a while because there are some joins to be done. If we need to predict a target for a particular country, these percentages may be different.

## Stations

In [11]:
stations = spark.read \
                .format("csv") \
                .option("header", "true") \
                .option("inferSchema", "true") \
                .load("stationlist.csv")
stations.createOrReplaceTempView('stations')
stations.show()

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|012240|          NO|
|020690|          SW|
|020870|          SW|
|021190|          SW|
|032690|          UK|
|033450|          UK|
|039290|          UK|
|039790|          EI|
|040480|          IC|
|041300|          IC|
|060100|          FO|
|061443|          DA|
|063401|          NL|
|071910|          FR|
|092640|          GM|
|123766|          PL|
|125990|          PL|
|129700|          HU|
|132240|          HR|
|156500|          BU|
+------+------------+
only showing top 20 rows



In [12]:
stations.printSchema()

root
 |-- STN_NO: string (nullable = true)
 |-- COUNTRY_ABBR: string (nullable = true)



In [13]:
stations.count()

25306

## Countries

In [14]:
countries = spark.read \
               .format("csv") \
               .option("header", "true") \
               .option("inferSchema", "true") \
               .load("countrylist.csv")
countries.createOrReplaceTempView('countries')     
countries.printSchema()

root
 |-- COUNTRY_ABBR: string (nullable = true)
 |-- COUNTRY_FULL: string (nullable = true)



In [15]:
countries.show()

+------------+--------------------+
|COUNTRY_ABBR|        COUNTRY_FULL|
+------------+--------------------+
|          AA|               ARUBA|
|          AC| ANTIGUA AND BARBUDA|
|          AF|         AFGHANISTAN|
|          AG|             ALGERIA|
|          AI|    ASCENSION ISLAND|
|          AJ|          AZERBAIJAN|
|          AL|             ALBANIA|
|          AM|             ARMENIA|
|          AN|             ANDORRA|
|          AO|              ANGOLA|
|          AQ|      AMERICAN SAMOA|
|          AR|           ARGENTINA|
|          AS|           AUSTRALIA|
|          AT|ASHMORE AND CARTI...|
|          AU|             AUSTRIA|
|          AV|            ANGUILLA|
|          AX|             ANTIGUA|
|          AY|          ANTARCTICA|
|          AZ|              AZORES|
|          BA|             BAHRAIN|
+------------+--------------------+
only showing top 20 rows



In [16]:
countries.count()

288

In [17]:
stations_n_countries = spark.sql("""
SELECT A.STN_NO
     , B.COUNTRY_FULL
FROM stations A INNER JOIN countries B ON A.COUNTRY_ABBR = B.COUNTRY_ABBR
""")
stations_n_countries.createOrReplaceTempView('stations_n_countries')
stations_n_countries.show()

+------+--------------+
|STN_NO|  COUNTRY_FULL|
+------+--------------+
|012240|        NORWAY|
|020690|        SWEDEN|
|020870|        SWEDEN|
|021190|        SWEDEN|
|032690|UNITED KINGDOM|
|033450|UNITED KINGDOM|
|039290|UNITED KINGDOM|
|039790|       IRELAND|
|040480|       ICELAND|
|041300|       ICELAND|
|060100| FAROE ISLANDS|
|061443|       DENMARK|
|063401|   NETHERLANDS|
|071910|        FRANCE|
|092640|       GERMANY|
|123766|        POLAND|
|125990|        POLAND|
|129700|       HUNGARY|
|132240|       CROATIA|
|156500|      BULGARIA|
+------+--------------+
only showing top 20 rows



In [18]:
stations_n_countries.count()

25209

In [19]:
missing_countries = spark.sql("""

SELECT COUNTRY_ABBR
FROM (
        SELECT A.STN_NO
            , A.COUNTRY_ABBR
            , B.COUNTRY_FULL
        FROM stations A LEFT OUTER JOIN countries B ON A.COUNTRY_ABBR = B.COUNTRY_ABBR    
)
WHERE COUNTRY_FULL IS NULL

""")
missing_countries.count()

97

In [20]:
missing_countries.select('COUNTRY_ABBR').distinct().show()

+------------+
|COUNTRY_ABBR|
+------------+
|          UC|
|          RI|
|          OD|
|          AE|
|          KV|
|          MJ|
|          NN|
|          TT|
+------------+



UC stands for Union of the Comoros, based on https://planetarynames.wr.usgs.gov/Abbreviations

In [21]:
countries.where("COUNTRY_FULL LIKE '%COMOROS'").show()

+------------+------------+
|COUNTRY_ABBR|COUNTRY_FULL|
+------------+------------+
|          CN|     COMOROS|
+------------+------------+



In [22]:
stations.where("COUNTRY_ABBR = 'UC'").show()

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|789880|          UC|
+------+------------+



In [23]:
weather.where("STN_NO = 789880").show()

+------+----+----------+-----+-----+-------+-------+-----+-----+-----+-----+-----+-----+----+----+---+----+----+----+-------+-------+
|STN_NO|WBAN|    DT_REF| TEMP| DEWP|    SLP|    STP|VISIB| WDSP|MXSPD| GUST|  MAX|  MIN|PRCP|SNDP|FOG|RAIN|SNOW|HAIL|THUNDER|TORNADO|
+------+----+----------+-----+-----+-------+-------+-----+-----+-----+-----+-----+-----+----+----+---+----+----+----+-------+-------+
|789880| 412|2019-01-01|802.0|686.0|10142.0|10130.0| 82.0|138.0|181.0| null|871.0|754.0|null|null|  0|   0|   0|   0|      0|      0|
|789880| 412|2019-01-02|801.0|683.0|10144.0|10133.0| 80.0|149.0|190.0|289.0|862.0|766.0|null|null|  0|   0|   0|   0|      0|      0|
|789880| 412|2019-01-03|801.0|698.0|10138.0|10126.0| 81.0|147.0|190.0|270.0|858.0|766.0|null|null|  0|   0|   0|   0|      0|      0|
|789880| 412|2019-01-04|802.0|702.0|10155.0|10143.0| 74.0|139.0|190.0|299.0|860.0|770.0|null|null|  0|   1|   0|   0|      0|      0|
|789880| 412|2019-01-05|802.0|693.0|10149.0|10139.0| 83.0|121.

In [24]:
duplicated = [r['STN_NO'] for r in stations.groupBy('STN_NO').count().where('count > 1').select('STN_NO').collect()]
duplicated

['785145',
 '785140',
 '789880',
 '785265',
 '788660',
 '785510',
 '917920',
 '789900',
 '788730']

In [25]:
stations.where('STN_NO in (' + ', '.join(duplicated) + ')').orderBy('STN_NO', 'COUNTRY_ABBR').show()

+------+------------+
|STN_NO|COUNTRY_ABBR|
+------+------------+
|785140|          RQ|
|785140|          US|
|785145|          RQ|
|785145|          US|
|785265|          RQ|
|785265|          US|
|785510|          US|
|785510|          VQ|
|788660|          NN|
|788660|          NT|
|788730|          NL|
|788730|          NT|
|789880|          NT|
|789880|          UC|
|789900|          NL|
|789900|          NT|
|917920|          FJ|
|917920|          TN|
+------+------------+



In [26]:
weather = spark.sql("""
SELECT A.*
     , B.COUNTRY_FULL AS COUNTRY 
FROM weather A INNER JOIN stations_n_countries B ON A.STN_NO = B.STN_NO
""")
weather.createOrReplaceTempView('weather')
weather.show()

+------+----+----------+-----+-----+----+----+-----+-----+-----+-----+-----+-----+----+----+---+----+----+----+-------+-------+-------+
|STN_NO|WBAN|    DT_REF| TEMP| DEWP| SLP| STP|VISIB| WDSP|MXSPD| GUST|  MAX|  MIN|PRCP|SNDP|FOG|RAIN|SNOW|HAIL|THUNDER|TORNADO|COUNTRY|
+------+----+----------+-----+-----+----+----+-----+-----+-----+-----+-----+-----+----+----+---+----+----+----+-------+-------+-------+
|111710|null|2019-01-01|352.0|319.0|null|null| 55.0| 61.0|229.0|460.0|376.0|315.0|14.0|null|  0|   1|   1|   0|      0|      0|AUSTRIA|
|111710|null|2019-01-02|275.0|219.0|null|null|  6.0|172.0|272.0|490.0|376.0|212.0|16.0|null|  0|   1|   1|   0|      0|      0|AUSTRIA|
|111710|null|2019-01-03|205.0|160.0|null|null| 11.0|151.0|220.0|410.0|252.0|176.0|11.0|null|  0|   0|   1|   0|      0|      0|AUSTRIA|
|111710|null|2019-01-04|220.0|181.0|null|null| 18.0| 93.0|171.0|381.0|284.0|190.0|null|null|  0|   0|   1|   0|      0|      0|AUSTRIA|
|111710|null|2019-01-05|306.0|267.0|null|null|  

In [27]:
total = weather.where("COUNTRY = 'INDIA'").count()
total

64753

In [28]:
weather.where("COUNTRY = 'INDIA'") \
       .select([(100 * count(when(isnull(c), c)) / total).alias(c).cast(IntegerType()) for c in weather.columns]) \
       .show()

+------+----+------+----+----+---+---+-----+----+-----+----+---+---+----+----+---+----+----+----+-------+-------+-------+
|STN_NO|WBAN|DT_REF|TEMP|DEWP|SLP|STP|VISIB|WDSP|MXSPD|GUST|MAX|MIN|PRCP|SNDP|FOG|RAIN|SNOW|HAIL|THUNDER|TORNADO|COUNTRY|
+------+----+------+----+----+---+---+-----+----+-----+----+---+---+----+----+---+----+----+----+-------+-------+-------+
|     0| 100|     0|   0|   0| 32| 68|    0|   0|    6|  96|  0|  0|  78|  99|  0|   0|   0|   0|      0|      0|      0|
+------+----+------+----+----+---+---+-----+----+-----+----+---+---+----+----+---+----+----+----+-------+-------+-------+



In [29]:
weather.count()

3583663

In [38]:
grouped = spark.sql("""

SELECT COUNTRY
     , DT_REF
     , AVG(TEMP)    AS TEMP
     , AVG(DEWP)    AS DEWP
     , AVG(SLP)     AS SLP
     , AVG(STP)     AS STP
     , AVG(VISIB)   AS VISIB
     , AVG(WDSP)    AS WDSP
     , AVG(MXSPD)   AS MXSPD
     , AVG(GUST)    AS GUST
     , AVG(MAX)     AS MAX
     , AVG(MIN)     AS MIN
     , AVG(PRCP)    AS PRCP
     , AVG(SNDP)    AS SNDP
     , MAX(FOG)     AS FOG
     , MAX(RAIN)    AS RAIN
     , MAX(SNOW)    AS SNOW
     , MAX(HAIL)    AS HAIL
     , MAX(THUNDER) AS THUNDER
     , MAX(TORNADO) AS TORNADO
FROM weather
GROUP BY COUNTRY
       , DT_REF

""")

grouped.count()

75843

In [48]:
grouped.groupBy('COUNTRY').count().where('count < 365').orderBy('count').show()

+--------------------+-----+
|             COUNTRY|count|
+--------------------+-----+
| JUAN DE NOVA ISLAND|   37|
|             TOKELAU|   70|
|ST. VINCENT AND T...|   75|
|    PITCAIRN ISLANDS|   81|
|           SWAZILAND|  274|
|          MONTSERRAT|  295|
|             LESOTHO|  295|
|SAO TOME AND PRIN...|  307|
|        SIERRA LEONE|  313|
|              MALAWI|  323|
|               LIBYA|  326|
|            DJIBOUTI|  332|
|               ZAIRE|  333|
|             BURUNDI|  333|
|              GUINEA|  333|
|            CAMEROON|  336|
|CENTRAL AFRICAN R...|  336|
|   EQUATORIAL GUINEA|  336|
|            ZIMBABWE|  342|
|    CHRISTMAS ISLAND|  350|
+--------------------+-----+
only showing top 20 rows



In [49]:
grouped.repartition(5).write.csv('data/grouped', header=True)