In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode

In [3]:
spark = SparkSession(sc)

In [4]:
df = spark.read.json("s3a://finalprojectweather/2017/05/01/01/*")

In [5]:
df.printSchema()

root
 |-- base: string (nullable = true)
 |-- clouds: struct (nullable = true)
 |    |-- all: long (nullable = true)
 |-- cod: long (nullable = true)
 |-- coord: struct (nullable = true)
 |    |-- lat: double (nullable = true)
 |    |-- lon: double (nullable = true)
 |-- dt: long (nullable = true)
 |-- id: long (nullable = true)
 |-- main: struct (nullable = true)
 |    |-- humidity: long (nullable = true)
 |    |-- pressure: long (nullable = true)
 |    |-- temp: double (nullable = true)
 |    |-- temp_max: double (nullable = true)
 |    |-- temp_min: double (nullable = true)
 |-- name: string (nullable = true)
 |-- sys: struct (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- message: double (nullable = true)
 |    |-- sunrise: long (nullable = true)
 |    |-- sunset: long (nullable = true)
 |    |-- type: long (nullable = true)
 |-- visibility: long (nullable = true)
 |-- weather: array (nullable = true)
 |    |-- element: s

In [6]:
main_table = df.select('id','name', 'base', 'sys.type', 'sys.country','coord.*').distinct()

In [8]:
main_table.show()

+-------+-------------+--------+----+-------+-----+-------+
|     id|         name|    base|type|country|  lat|    lon|
+-------+-------------+--------+----+-------+-----+-------+
|5391959|San Francisco|stations|   1|     US|37.77|-122.42|
|5809844|      Seattle|stations|   1|     US|47.61|-122.33|
+-------+-------------+--------+----+-------+-----+-------+



In [9]:
second_table = df.selectExpr('id', 'sys.id as sys_id').distinct()

In [10]:
second_table.show()

+-------+------+
|     id|sys_id|
+-------+------+
|5809844|  2949|
|5391959|   226|
|5391959|  4250|
+-------+------+



In [11]:
third_table = df.select(explode(df.weather).alias('tmp'), 'id', 'sys', 'visibility', 'wind').selectExpr('id','tmp.description','tmp.icon','tmp.id as weather_id','tmp.main', 'sys.message','sys.sunrise','sys.sunset', 'visibility', 'wind.*').distinct()

In [12]:
third_table.show()

+-------+----------------+----+----------+------+-------+----------+----------+----------+---+-----+
|     id|     description|icon|weather_id|  main|message|   sunrise|    sunset|visibility|deg|speed|
+-------+----------------+----+----------+------+-------+----------+----------+----------+---+-----+
|5391959|   broken clouds| 04n|       803|Clouds| 0.0134|1493644408|1493694032|     10000|260|  3.1|
|5809844|      light rain| 10n|       500|  Rain| 0.0054|1493643090|1493695326|     16093| 90|  2.6|
|5809844|      light rain| 10n|       500|  Rain| 0.0056|1493643089|1493695327|     16093| 90|  2.6|
|5809844|            mist| 50n|       701|  Mist|  0.225|1493643087|1493695328|     16093| 80|  3.1|
|5391959|scattered clouds| 03n|       802|Clouds| 0.2632|1493644409|1493694031|     16093|150|  6.2|
|5809844|      light rain| 10n|       500|  Rain| 0.7779|1493643089|1493695326|     16093| 90|  2.6|
|5809844|   moderate rain| 10d|       501|  Rain| 2.2124|1493643093|1493695323|     16093| 

In [None]:
# save data as parquet
main_table.write.parquet("s3://hardoopmapreduce/project/table1")
second_table.write.parquet("s3://hardoopmapreduce/project/table2")
third_table.write.parquet("s3://hardoopmapreduce/project/table3")