In [1]:
sc.version

In [2]:
#importujemy pakiety

import pyspark.sql.types as typ
import pyspark.sql.functions as func
from pyspark.sql.functions import col
import pyspark.mllib.stat as st
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql.functions import create_map, lit


In [3]:
#tworzymy schemat ramki danych

col_names=[
  ('dataid', typ.IntegerType()),
  ('local_15min', typ.StringType()),
  ('air1', typ.DoubleType()),
  ('air2', typ.DoubleType()),
  ('air3', typ.DoubleType()),
  ('airwindowunit1', typ.DoubleType()),
  ('aquarium1', typ.DoubleType()),
  ('bathroom1', typ.DoubleType()),
  ('bathroom2', typ.DoubleType()),
  ('bedroom1', typ.DoubleType()),
  ('bedroom2', typ.DoubleType()),
  ('bedroom3', typ.DoubleType()),
  ('bedroom4', typ.DoubleType()),
  ('bedroom5', typ.DoubleType()),
  ('battery1', typ.DoubleType()),
  ('car1', typ.DoubleType()),
  ('car2', typ.DoubleType()),
  ('circpump1', typ.DoubleType()),
  ('clotheswasher1', typ.DoubleType()),
  ('clotheswasher_dryg1', typ.DoubleType()),
  ('diningroom1', typ.DoubleType()),
  ('diningroom2', typ.DoubleType()),
  ('dishwasher1', typ.DoubleType()),
  ('disposal1', typ.DoubleType()),
  ('drye1', typ.DoubleType()),
  ('dryg1', typ.DoubleType()),
  ('freezer1', typ.DoubleType()),
  ('furnace1', typ.DoubleType()),
  ('furnace2', typ.DoubleType()),
  ('garage1', typ.DoubleType()),
  ('garage2', typ.DoubleType()),
  ('grid', typ.DoubleType()),
  ('heater1', typ.DoubleType()),
  ('heater2', typ.DoubleType()),
  ('heater3', typ.DoubleType()),
  ('housefan1', typ.DoubleType()),
  ('icemaker1', typ.DoubleType()),
  ('jacuzzi1', typ.DoubleType()),
  ('kitchen1', typ.DoubleType()),
  ('kitchen2', typ.DoubleType()),
  ('kitchenapp1', typ.DoubleType()),
  ('kitchenapp2', typ.DoubleType()),
  ('lights_plugs1', typ.DoubleType()),
  ('lights_plugs2', typ.DoubleType()),
  ('lights_plugs3', typ.DoubleType()),
  ('lights_plugs4', typ.DoubleType()),
  ('lights_plugs5', typ.DoubleType()),
  ('lights_plugs6', typ.DoubleType()),
  ('livingroom1', typ.DoubleType()),
  ('livingroom2', typ.DoubleType()),
  ('microwave1', typ.DoubleType()),
  ('office1', typ.DoubleType()),
  ('outsidelights_plugs1', typ.DoubleType()),
  ('outsidelights_plugs2', typ.DoubleType()),
  ('oven1', typ.DoubleType()),
  ('oven2', typ.DoubleType()),
  ('pool1', typ.DoubleType()),
  ('pool2', typ.DoubleType()),
  ('poollight1', typ.DoubleType()),
  ('poolpump1', typ.DoubleType()),
  ('pump1', typ.DoubleType()),
  ('range1', typ.DoubleType()),
  ('refrigerator1', typ.DoubleType()),
  ('refrigerator2', typ.DoubleType()),
  ('security1', typ.DoubleType()),
  ('sewerpump1', typ.DoubleType()),
  ('shed1', typ.DoubleType()),
  ('solar', typ.DoubleType()),
  ('solar2', typ.DoubleType()),
  ('sprinkler1', typ.DoubleType()),
  ('sumppump1', typ.DoubleType()),
  ('utilityroom1', typ.DoubleType()),
  ('venthood1', typ.DoubleType()),
  ('waterheater1', typ.DoubleType()),
  ('waterheater2', typ.DoubleType()),
  ('wellpump1', typ.DoubleType()),
  ('winecooler1', typ.DoubleType()),
  ('leg1v', typ.DoubleType()),
  ('leg2v', typ.DoubleType()),
 
]

In [4]:
schema = typ.StructType([typ.StructField(e[0], e[1], True) for e in col_names])
schema

In [5]:
#wczytujemy pliki z danymi
data_austin=spark.read.csv('/FileStore/tables/15minute_data_austin.csv', header=True, schema=schema)

In [6]:
#sprawdzamy wybrane kolumny
data_austin['dataid','local_15min', 'grid'].show(5)

In [7]:
#wczytujemy pozostałe ramki california i newyork
data_california=spark.read.csv('/FileStore/tables/15minute_data_california.csv', header=True, schema=schema)

In [8]:
data_newyork=spark.read.csv('/FileStore/tables/15minute_data_newyork.csv', header=True, schema=schema)

In [9]:
#sprawdzamy wartości null w posczególnych ramkach
data_austin.toPandas().info()

In [10]:
data_california.toPandas().info()

In [11]:
data_newyork.toPandas().info()

In [12]:
# do dalszej analizy zostawiamy kolumny 'dataid', 'local_15min', 'grid'
cols_selected=[
  'dataid',
  'local_15min',
  'grid'
]

data_austin_cut=data_austin.select(cols_selected)
data_california_cut=data_california.select(cols_selected)
data_newyork_cut=data_newyork.select(cols_selected)

In [13]:
#sprawdzamy dane
data_california_cut.show(10)

In [14]:
#łączymy ramki w jedną
import functools 

def unionAll(dfs):
    return functools.reduce(lambda df1,df2: df1.union(df2.select(df1.columns)), dfs) 
  
data_all_regions=unionAll([data_austin_cut, data_california_cut, data_newyork_cut])

In [15]:
#sprawdzamy dane
data_all_regions.toPandas().info()

In [16]:
#wyrzucamy rekordy z wartością null w kolumnie grid

print(data_all_regions.where(col('grid').isNotNull()).count())
data_all_notnull=data_all_regions.where(col('grid').isNotNull())



In [17]:
#sprawdzamy ramkę
data_all_notnull.toPandas().info()

In [18]:
#wczytujemy plik metadata i tworzymy schemat

df_metadata = sqlContext.read\
    .format('com.databricks.spark.csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('/FileStore/tables/metadata.csv')


In [19]:
#z pliku metadata zostawiamy kolumny dataid, city, state
df_metadata_cut=df_metadata['dataid', 'city', 'state']


In [20]:
#sprawdzamy
df_metadata_cut.show(10)

In [21]:
#tworzymy słownik z kolumn dataid i city
id_city_dict=df_metadata_cut.rdd

In [22]:
key_value_id_city=id_city_dict.map(lambda x: (x[0], x[1]))

In [23]:
ready_id_city_dict=key_value_id_city.collectAsMap()


In [24]:
print(ready_id_city_dict)


In [25]:
# mapujemy id domów i przypisujemy odpowiadająe im miasta
mapping = create_map([lit(x) for x in chain(*ready_id_city_dict.items())])



In [26]:
df_all_city=data_all_notnull.withColumn('city',mapping[data_all_notnull['dataid']].alias('city'))

In [27]:
df_all_city.show(5)

In [28]:
#mapujemy stany i dołączamy do ramki

id_state_dict=df_metadata_cut.rdd
key_value_id_state=id_state_dict.map(lambda x: (x[0], x[2]))
ready_id_state_dict=key_value_id_state.collectAsMap()
mapping = create_map([lit(x) for x in chain(*ready_id_state_dict.items())])
df_all_city_state=df_all_city.withColumn('state',mapping[df_all_city['dataid']].alias('state'))

In [29]:
df_all_city_state.show(10)

In [30]:
#zapisujemy ramkę do pliku csv i pobieramy na dysk lokalny szczegółowa instrukcja na https://towardsdatascience.com/databricks-how-to-save-files-in-csv-on-your-local-computer-3d0c70e6a9ab
df_all_city_state.coalesce(1).write.format('com.databricks.spark.csv').option('header', 'true').save('dbfs:/FileStore/tables/data_dyplom_city_state.csv')