# Imports and spark setup

In [1]:
import sys, os
sys.path.insert(0, os.path.abspath('..'))
import findspark
findspark.init()
import pyspark
import random
from pyspark.sql import SparkSession
from pathlib import Path
from src.utils import DisplayablePath
from pyspark.sql import SQLContext


In [52]:
def spark_read_csv(filepath):
    df = spark.read.load(filepath,format="csv", inferSchema="true", header="true")
    return df

In [69]:
def dataframe_summary(df):
    print("----------------------")
    #print(df.head(2))
    print(df.printSchema())
    print(f"There are {df.count()} rows")
    

In [2]:
def stop_spark():
    return sc.stop()

In [11]:
def test():
    num_samples = 10000
    def inside(p):     
        x, y = random.random(), random.random()
        return x*x + y*y < 1

    count = sc.parallelize(range(0, num_samples)).filter(inside).count()

    pi = 4 * count / num_samples
    return print(pi)

In [4]:
APPNAME="ClimateChange"

In [9]:
spark = SparkSession.builder.appName(APPNAME).getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(spark)

In [12]:
test()

[Stage 0:>                                                          (0 + 4) / 4]

3.142


                                                                                

In [7]:
stop_spark()

# Data Imports

In [14]:
DATA_DIR = r"/Users/jamesmoro/Documents/Python/ClimateData/data/raw"
paths = DisplayablePath.make_tree(Path(DATA_DIR))
for path in paths:
    print(path.displayable())

raw/
├── .DS_Store
├── .gitkeep
├── climate_change_download_0.xls
├── GlobalTemperatures/
│   ├── GlobalLandTemperaturesByCity.csv
│   ├── GlobalLandTemperaturesByCountry.csv
│   ├── GlobalLandTemperaturesByMajorCity.csv
│   ├── GlobalLandTemperaturesByState.csv
│   ├── GlobalTemperatures.csv
│   └── metadata.txt
└── UNGreenhouseGasInventoryData /
    ├── .DS_Store
    ├── Co2.csv
    ├── Hydrofluorocarbons.csv
    ├── metadata.txt
    ├── Methane.csv
    ├── Nitrogen_trifluoride .csv
    ├── Nitrous_oxide.csv
    ├── Perfluorocarbons.csv
    └── Sulphur_hexafluoride.csv


In [15]:
GLOBALTEMP_DIR = Path(r"/Users/jamesmoro/Documents/Python/ClimateData/data/raw", "GlobalTemperatures/")
GLOBALTEMPS_FILES = [
    "GlobalLandTemperaturesByCity",
    "GlobalLandTemperaturesByCountry",
    "GlobalLandTemperaturesByMajorCity",
    "GlobalLandTemperaturesByState",
    "GlobalTemperatures"
]

In [16]:
globaltemps_paths = [str(Path(GLOBALTEMP_DIR,file+".csv")) for file in GLOBALTEMPS_FILES] #have to convert back to str to read in spark

In [17]:
GREENHOUSEGAS_DIR = Path(r"/Users/jamesmoro/Documents/Python/ClimateData/data/raw", "UNGreenhouseGasInventoryData/")

GREENHOUSEGAS_FILES = [
    "Co2", 
    "Hydrofluorocarbons",
    "Methane", 
    "Nitrogen_trifluoride", 
    "Nitrous_oxide",
    "Perfluorocarbons",
    "Sulphur_hexafluoride",
]

In [20]:
greenhousegas_paths = [str(Path(GREENHOUSEGAS_DIR,file+".csv")) for file in GREENHOUSEGAS_FILES]

In [70]:
spark_read_csv(greenhousegas_paths[0]).show(2)

+---------------+----+----------------+
|Country or Area|Year|           Value|
+---------------+----+----------------+
|      Australia|2018|415953.946668257|
|      Australia|2017| 415097.42766819|
+---------------+----+----------------+
only showing top 2 rows



In [99]:
greenhouse_gas_count = {Path(file).stem:(spark_read_csv(file)).count() for file in greenhousegas_paths}
greenhouse_gas_count

{'Co2': 1247,
 'Hydrofluorocarbons': 1149,
 'Methane': 1247,
 'Nitrogen_trifluoride': 316,
 'Nitrous_oxide': 1247,
 'Perfluorocarbons': 978,
 'Sulphur_hexafluoride': 1207}