## Imports & Configuration

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import lag 
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col,isnan, when, count, countDistinct


In [2]:
spark = SparkSession \
    .builder \
    .appName("Data_Exploration") \
    .getOrCreate()

23/07/06 13:30:09 WARN Utils: Your hostname, nuno-g14 resolves to a loopback address: 127.0.1.1; using 192.168.1.161 instead (on interface wlp2s0)
23/07/06 13:30:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/07/06 13:30:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Auxiliary Functions

In [3]:
month_name = {
            'janeiro':'01',
            'fevereiro':'02',
            'março':'03',
            'abril':'04',
            'maio':'05',
            'junho':'06',
            'julho':'07',
            'agosto':'08',
            'setembro':'09',
            'outubro':'10',
            'novembro':'11',
            'dezembro':'12'}

In [4]:
def deal_with_null_date(dataframe):
    new_date_list = []

    for row in dataframe.rdd.collect():
        new_date_list.append(row.Date) 

    for index, elem in enumerate(new_date_list):
        if elem == None:
            new_date_list[index] = new_date_list[index-1]
    
    pandas_df = dataframe.toPandas()
    pandas_df = pandas_df.assign(Date=new_date_list)
    new_dataframe = spark.createDataFrame(pandas_df) 
    return new_dataframe

In [5]:
def deal_with_extense_date(dataframe):
    new_date_list = []

    for row in dataframe.rdd.collect():
        new_date = row.Date.split(' ')
        new_month = month_name[new_date[0].lower()]
        new_year = new_date[2]
        new_date_list.append(new_year + '/' + new_month) 

    
    pandas_df = dataframe.toPandas()
    pandas_df = pandas_df.assign(Date=new_date_list)
    new_dataframe = spark.createDataFrame(pandas_df) 
    return new_dataframe

In [6]:
def deal_with_bad_values(dataframe):
    for col in dataframe.columns:
        if col != 'Date':
            dataframe = dataframe.withColumn(col, dataframe[col].cast(DoubleType()))
    return dataframe

# Data Reading

In [7]:
frame = spark.read.parquet("../MSc_Model_Datasets/Data_INE.parquet")

# Data Cleaning and Exploration 

In [8]:
frame = deal_with_extense_date(frame)

                                                                                

23/07/06 13:30:20 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [9]:
frame = deal_with_bad_values(frame)

In [10]:
frame.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in frame.columns]).show()

[Stage 3:>                                                        (0 + 16) / 16]

+----+-----------------+--------------+---------------+------------+-----------------+----------------+---------------+----------------+---------------------+------------------+-------------------+----------------+---------------------+--------------------+-------------------+--------------------+--------------+-----------+------------+---------+--------------+-------------+------------+-------------+-----------------+--------------+---------------+------------+-----------------+----------------+---------------+----------------+---------------+-------------+-----------+------------+--------------+------------------+--------------------+------------+------------------+-------------+---------------+----------------+------------+-----------+------------+-------------+---------------+-------------+----------------+----------------+-----------------+--------------+---------------+------------+-----------------+----------------+---------------+----------------+---------------------+---------

                                                                                

In [11]:
frame = frame.withColumnRenamed("Landed_Santa Maria","Landed_Santa_Maria")

In [12]:
frame.count()

671

In [13]:
frame.describe().show()



23/03/15 23:52:23 WARN DAGScheduler: Broadcasting large task binary with size 1554.3 KiB


                                                                                

+-------+-------+------------------+-----------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+---------------------+------------------+-------------------+-----------------+---------------------+--------------------+-------------------+--------------------+------------------+-----------------+-----------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+--------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------+-----------------+-----------------+------------------+

In [14]:
frame.printSchema()

root
 |-- Date: string (nullable = true)
 |-- TE_Total_Portugal: double (nullable = true)
 |-- TE_Total_North: double (nullable = true)
 |-- TE_Total_Center: double (nullable = true)
 |-- TE_Total_LMA: double (nullable = true)
 |-- TE_Total_Alentejo: double (nullable = true)
 |-- TE_Total_Algarve: double (nullable = true)
 |-- TE_Total_Azores: double (nullable = true)
 |-- TE_Total_Madeira: double (nullable = true)
 |-- TE_Hotelaria_Portugal: double (nullable = true)
 |-- TE_Hotelaria_North: double (nullable = true)
 |-- TE_Hotelaria_Center: double (nullable = true)
 |-- TE_Hotelaria_LMA: double (nullable = true)
 |-- TE_Hotelaria_Alentejo: double (nullable = true)
 |-- TE_Hotelaria_Algarve: double (nullable = true)
 |-- TE_Hotelaria_Azores: double (nullable = true)
 |-- TE_Hotelaria_Madeira: double (nullable = true)
 |-- TE_AL_Portugal: double (nullable = true)
 |-- TE_AL_North: double (nullable = true)
 |-- TE_AL_Center: double (nullable = true)
 |-- TE_AL_LMA: double (nullable = tru

In [15]:
for col in frame.columns:
    lst = frame.select(col).distinct().collect()
    print(col)
    for item in lst:
        print(item)
    print()

                                                                                

Date
Row(Date='1997/04')
Row(Date='1978/04')
Row(Date='1982/04')
Row(Date='1983/04')
Row(Date='1966/04')
Row(Date='1981/04')
Row(Date='1985/04')
Row(Date='1991/04')
Row(Date='1996/04')
Row(Date='1988/04')
Row(Date='1992/04')
Row(Date='1994/04')
Row(Date='2004/04')
Row(Date='2006/04')
Row(Date='1975/04')
Row(Date='1998/04')
Row(Date='1999/04')
Row(Date='1964/04')
Row(Date='1987/04')
Row(Date='1993/04')
Row(Date='1984/04')
Row(Date='1967/04')
Row(Date='2007/04')
Row(Date='1963/04')
Row(Date='1989/04')
Row(Date='1972/04')
Row(Date='1990/04')
Row(Date='1970/04')
Row(Date='1973/04')
Row(Date='1974/04')
Row(Date='1965/04')
Row(Date='1977/04')
Row(Date='1980/04')
Row(Date='1976/04')
Row(Date='1968/04')
Row(Date='2005/04')
Row(Date='1979/04')
Row(Date='1969/04')
Row(Date='1971/04')
Row(Date='1995/04')
Row(Date='1986/04')
Row(Date='1965/08')
Row(Date='2021/04')
Row(Date='2016/04')
Row(Date='1985/08')
Row(Date='2020/04')
Row(Date='1976/08')
Row(Date='1979/08')
Row(Date='2008/04')
Row(Date='1977/

                                                                                

TE_Total_Portugal
Row(TE_Total_Portugal=None)
Row(TE_Total_Portugal=386393.0)
Row(TE_Total_Portugal=303450.0)
Row(TE_Total_Portugal=4468.0)
Row(TE_Total_Portugal=290595.0)
Row(TE_Total_Portugal=334929.0)
Row(TE_Total_Portugal=47018.0)
Row(TE_Total_Portugal=555291.0)
Row(TE_Total_Portugal=518772.0)
Row(TE_Total_Portugal=638292.0)
Row(TE_Total_Portugal=793794.0)
Row(TE_Total_Portugal=325164.0)
Row(TE_Total_Portugal=592252.0)
Row(TE_Total_Portugal=187667.0)
Row(TE_Total_Portugal=173946.0)
Row(TE_Total_Portugal=152463.0)
Row(TE_Total_Portugal=53346.0)
Row(TE_Total_Portugal=205269.0)
Row(TE_Total_Portugal=153269.0)
Row(TE_Total_Portugal=194421.0)
Row(TE_Total_Portugal=147256.0)
Row(TE_Total_Portugal=18256.0)
Row(TE_Total_Portugal=164729.0)
Row(TE_Total_Portugal=172733.0)
Row(TE_Total_Portugal=131933.0)
Row(TE_Total_Portugal=174713.0)
Row(TE_Total_Portugal=164943.0)
Row(TE_Total_Portugal=106775.0)
Row(TE_Total_Portugal=150150.0)
Row(TE_Total_Portugal=32294.0)
Row(TE_Total_Portugal=681297.0)


KeyboardInterrupt: 

# Data Writing

In [17]:
frame.write.parquet("../MSc_Model_Datasets/Data_INE_Clean.parquet")

[Stage 37:>                                                       (0 + 16) / 16]

23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 75,08% for 9 writers
23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 84,47% for 8 writers
23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014 bytes) of heap memory
Scaling row group sizes to 96,54% for 7 writers
23/03/16 00:20:51 WARN MemoryManager: Total allocation exceeds 95,00% (906.992.014

                                                                                