In [1]:
import json

from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import IntegerType, TimestampType

### Initiate spark session

In [2]:
spark = SparkSession.builder \
    .appName('Cognitivo AI') \
    .master('local[*]') \
    .getOrCreate()

### Functions to help data processing

In [3]:
def json_to_dict(filename: str) -> dict:
   with open(filename) as f:
       return(json.load(f))

In [4]:
def change_column_type(df: DataFrame, column_name: str, column_type: str) -> DataFrame:
    df = df.withColumn(
        column_name, 
        df[column_name].cast(eval(f'{column_type.capitalize()}Type()'))
    )
    return df

### Parameters

In [5]:
input_file = 'data/input/users/load.csv'
output_folder = 'data/output/'
schema_file = 'config/types_mapping.json'
schema_dict = json_to_dict(schema_file)

In [6]:
df = spark.read.csv(input_file, header=True)
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: string (nullable = true)
 |-- create_date: string (nullable = true)
 |-- update_date: string (nullable = true)



## Convert the dataset to parquet

In [7]:
df.coalesce(1).write.mode("overwrite").parquet(f'{output_folder}1')

## Changing column types

In [8]:
schema_dict.update({'id': 'integer'})

In [9]:
for key, value in schema_dict.items():
    df = change_column_type(df, key, value)

In [10]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- address: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- create_date: timestamp (nullable = true)
 |-- update_date: timestamp (nullable = true)



#### Instead of using coalesce we could use repartition if we wanted to turn the grouped by date.

In [11]:
df.coalesce(1).write.mode("overwrite").parquet(f'{output_folder}2')

## Keep only the last id of the dataset

In [12]:
df.head(5)

[Row(id=1, name='david.lynch@cognitivo.ai', email='David Lynch', phone='(11) 99999-9997', address='Mulholland Drive, Los Angeles, CA, US', age=72, create_date=datetime.datetime(2018, 3, 3, 18, 47, 1, 954752), update_date=datetime.datetime(2018, 3, 3, 18, 47, 1, 954752)),
 Row(id=1, name='david.lynch@cognitivo.ai', email='David Lynch', phone='(11) 99999-9998', address='Mulholland Drive, Los Angeles, CA, US', age=72, create_date=datetime.datetime(2018, 3, 3, 18, 47, 1, 954752), update_date=datetime.datetime(2018, 4, 14, 17, 9, 48, 558151)),
 Row(id=2, name='sherlock.holmes@cognitivo.ai', email='Sherlock Holmes', phone='(11) 94815-1623', address='221B Baker Street, London, UK', age=34, create_date=datetime.datetime(2018, 4, 21, 20, 21, 24, 364752), update_date=datetime.datetime(2018, 4, 21, 20, 21, 24, 364752)),
 Row(id=3, name='spongebob.squarepants@cognitivo.ai', email='Spongebob Squarepants', phone='(11) 91234-5678', address='124 Conch Street, Bikini Bottom, Pacific Ocean', age=13, cre

In [13]:
df.createOrReplaceTempView('users')

In [14]:
df = spark.sql(sqlQuery = '''
    SELECT 
        *
    FROM (
        SELECT 
            *,
            dense_rank() OVER (PARTITION BY id ORDER BY update_date DESC) AS rank
        FROM users
    ) a WHERE rank = 1
''');
df.drop('rank')

DataFrame[id: int, name: string, email: string, phone: string, address: string, age: int, create_date: timestamp, update_date: timestamp]

In [15]:
df.head(5)

[Row(id=1, name='david.lynch@cognitivo.ai', email='David Lynch', phone='(11) 99999-9999', address='Mulholland Drive, Los Angeles, CA, US', age=72, create_date=datetime.datetime(2018, 3, 3, 18, 47, 1, 954752), update_date=datetime.datetime(2018, 5, 23, 10, 13, 59, 594752), rank=1),
 Row(id=3, name='spongebob.squarepants@cognitivo.ai', email='Spongebob Squarepants', phone='(11) 98765-4321', address='122 Conch Street, Bikini Bottom, Pacific Ocean', age=13, create_date=datetime.datetime(2018, 5, 19, 4, 7, 6, 854752), update_date=datetime.datetime(2018, 5, 19, 5, 8, 7, 964752), rank=1),
 Row(id=2, name='sherlock.holmes@cognitivo.ai', email='Sherlock Holmes', phone='(11) 94815-1623', address='221B Baker Street, London, UK', age=34, create_date=datetime.datetime(2018, 4, 21, 20, 21, 24, 364752), update_date=datetime.datetime(2018, 4, 21, 20, 21, 24, 364752), rank=1)]

In [16]:
df.coalesce(1).write.mode("overwrite").parquet(f'{output_folder}3')