In [1]:
# basic libraries       
import os
import matplotlib.pyplot as plt
import matplotlib
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession, DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer, VectorAssembler, StringIndexer, VectorIndexer

# state control
state = 1
np.random.seed(state)

# To plot pretty figures
import seaborn as sns 
plt.style.use('seaborn')
sns.set(style="ticks", color_codes=True)

# Directories
PROJECT_ROOT_DIR = '/home/tulan/PycharmProjects/Google/Machine_Learning/Machine Learning With Big Data/'
DATA_DIR = '/home/tulan/PycharmProjects/Google/Machine_Learning/Machine Learning With Big Data/data/'
IMAGE_DIR = PROJECT_ROOT_DIR + 'images'

# save the figures
def save_fig(fig_id, tight_layout=True):
    if not os.path.exists(IMAGE_DIR):
        os.makedirs(IMAGE_DIR)
    path = os.path.join(IMAGE_DIR, fig_id + ".png")
    print("Saving figure", fig_id)
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format='png', dpi=300)

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
df = spark.read.csv(DATA_DIR + 'minute_weather.csv', header=True, inferSchema=True)

In [4]:
df.columns

['rowID',
 'hpwren_timestamp',
 'air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'min_wind_direction',
 'min_wind_speed',
 'rain_accumulation',
 'rain_duration',
 'relative_humidity']

In [5]:
df.printSchema()

root
 |-- rowID: integer (nullable = true)
 |-- hpwren_timestamp: timestamp (nullable = true)
 |-- air_pressure: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- avg_wind_direction: double (nullable = true)
 |-- avg_wind_speed: double (nullable = true)
 |-- max_wind_direction: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- min_wind_direction: double (nullable = true)
 |-- min_wind_speed: double (nullable = true)
 |-- rain_accumulation: double (nullable = true)
 |-- rain_duration: double (nullable = true)
 |-- relative_humidity: double (nullable = true)



In [6]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
rowID,1587257,793628.0,458201.77244910353,0,1587256
air_pressure,1587257,916.8301266904576,3.0515931266813565,905.0,929.5
air_temp,1587257,61.851440428338314,11.833623786835963,31.64,99.5
avg_wind_direction,1586824,161.96537927331576,95.20811970204333,0.0,359.0
avg_wind_speed,1586824,2.7742720679795654,2.0607577935630297,0.0,32.3
max_wind_direction,1586824,163.40304784903682,92.36723428064334,0.0,359.0
max_wind_speed,1586824,3.3998134008568908,2.4231674336170603,0.1,36.0
min_wind_direction,1586824,166.82637078844283,97.46274620077509,0.0,359.0
min_wind_speed,1586824,2.1331304542931133,1.7453450849326482,0.0,32.0


In [8]:
df.count()

1587257

In [9]:
filteredDF = df.filter((df.rowID % 10) == 0)
filteredDF.count()

158726

In [10]:
filteredDF.filter(filteredDF.rain_accumulation == 0).count()

157812

In [12]:
workingDF = filteredDF.drop('rain_accumulation').drop('rain_duration').drop('hpwren_timestamp').dropna()

In [13]:
workingDF.count()

158680

In [16]:
filteredDF.dropna().stat.corr('min_wind_speed', 'max_wind_speed')

0.9257915876425128