## Current moment variables (imitation)
### (please set _pt_year_ and _pt_month_)

In [28]:
pt_year = 2020
pt_month = 5

if pt_month <= 3:
    pt_quarter = 1
elif pt_month <= 6:
    pt_quarter = 2
elif pt_month <= 9:
    pt_quarter = 3
else:
    pt_quarter = 4

## Create a Spark session

In [1]:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"] = "/home/pigidser/anaconda3/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "pyspark-shell"

In [2]:
master = "local"
spark = SparkSession.builder.master(master).appName("spark_test").getOrCreate()

## RAW --> STAGE

In [3]:
! rm data/stage/*.*

In [4]:
# transform json-files (capital, continent, currency, iso3, names, phone) to csv

import json
import csv
import subprocess

def transform_json_to_csv(file_name):
    with open("data/raw/"+file_name+".json","r") as fp:
        buf = fp.read()
    data_dict = json.loads(buf)
    
    with open("data/stage/"+file_name+".csv", 'w') as f:
        for key in data_dict.keys():
            f.write("%s,%s\n"%(key,data_dict[key]))

files_to_process = ['capital','continent','currency','iso3','names', 'phone']
for file in files_to_process:
    transform_json_to_csv(file)

In [5]:
# countries_of_the_world.csv consists of comma in number as well as delimited character --> replace a comma with a point
# also to remove multy-spaces

import re

with open("data/raw/countries_of_the_world.csv","r") as f_input:
    with open('data/stage/country.csv', 'w') as f_output:
        for result in f_input:
            result = re.sub(r'"(\d+),(\d+)"', r'\1.\2', result) # find a comma between digits in quotes and replace to a point
            result = re.sub(r'\s+', r' ', result) # replace multi-space to single space
            result = re.sub(r'\s,', r',', result)
            result = re.sub(r'\s+",', r'",', result)
            result = re.sub(r'"\s+', r'"', result)
            f_output.write(result+"\n")
f_input.close
f_output.close

with open("data/raw/nobel_laureates.csv","r") as f_input:
    with open('data/stage/nobel_laureates.csv', 'w') as f_output:
        for result in f_input:
            result = re.sub(r'[\"]+','"',result)
            result = re.sub(r'\s+', r' ', result)
            result = re.sub(r'\s,', r',', result)
            f_output.write(result+"\n")
f_input.close
f_output.close

<function TextIOWrapper.close()>

In [6]:
# Copy to the stage folder
! cp data/raw/worldcitiespop.csv data/stage/

Now we have next csv-files in the stage folder:
- capital
- continent
- currency
- iso3
- names
- phone
- countries
- nobel_laureates
- worldcitiespop

## Load data to Spark

In [38]:
capital = spark.read.csv("data/stage/capital.csv",schema="code string, capital string",header=False)
continent = spark.read.csv("data/stage/continent.csv",schema="code string, continent string",header=False)
currency = spark.read.csv("data/stage/currency.csv",schema="code string, currency string",header=False)
iso3 = spark.read.csv("data/stage/iso3.csv",schema="code string, iso3 string",header=False)
names = spark.read.csv("data/stage/names.csv",schema="code string, name string",header=False)
phone = spark.read.csv("data/stage/phone.csv",schema="code string, phone string",header=False)
countries = spark.read.csv("data/stage/country.csv",header=True)
laureates = spark.read.csv("data/stage/nobel_laureates.csv",header=True)
worldcitiespop = spark.read.csv("data/stage/worldcitiespop.csv",header=True)

In [39]:
import re

def standartize_column_name(rename_df):
    for column in rename_df.columns:
        new_column = re.sub(r'[\.\s\(\)\/\$\%]+','_',column.lower().strip())
        new_column = re.sub(r'[\_]+','_',new_column)
        new_column = re.sub(r'(\S+)_\b', r'\1', new_column) # if name like abcdefgh_ remove "_"
        rename_df = rename_df.withColumnRenamed(column, new_column)
    return rename_df

In [40]:
# joining by a key field (with the same name) to get only one key field in the result dataframe
country_properties = capital.join(continent, 'code')\
    .join(currency, 'code')\
    .join(iso3, 'code')\
    .join(names, 'code')\
    .join(phone, 'code')

In [41]:
country_properties = standartize_column_name(country_properties)
countries = standartize_column_name(countries)
laureates = standartize_column_name(laureates)
worldcitiespop = standartize_column_name(worldcitiespop)

## Data Mart preparation

In [42]:
countries = countries\
    .join(country_properties.select('code','capital','continent','currency','iso3','name','phone'),countries['country']==country_properties['name'],'left')\
    .drop("name")


nobel_laureates = laureates.select('laureate_id','category','laureate_type','full_name','birth_date',
    'birth_city','birth_country','sex','organization_name','organization_city','organization_country',
    'death_date','death_city','death_country')

nobel_prizes = nobel_prizes.select("year","prize","motivation","prize_share","laureate_id")

cities = worldcitiespop.drop("region")

In [12]:
# nobel_prizes.show(1)
# nobel_laureates.show(1)
# countries.show(1)
# cities.show(1)

+----+--------------------+--------------------+-----------+-----------+
|year|               prize|          motivation|prize_share|laureate_id|
+----+--------------------+--------------------+-----------+-----------+
|1901|The Nobel Prize i...|in recognition of...|        1/1|        160|
+----+--------------------+--------------------+-----------+-----------+
only showing top 1 row

+-----------+---------+-------------+--------------------+----------+----------+-------------+----+-----------------+-----------------+--------------------+----------+----------+-------------+
|laureate_id| category|laureate_type|           full_name|birth_date|birth_city|birth_country| sex|organization_name|organization_city|organization_country|death_date|death_city|death_country|
+-----------+---------+-------------+--------------------+----------+----------+-------------+----+-----------------+-----------------+--------------------+----------+----------+-------------+
|        160|Chemistry|   Indivi

## Dataset preparation

In [None]:
nobel_prizes_dataset = nobel_prizes.join(nobel_laureates, 'laureate_id')
countries_dataset = countries

In [13]:
# nobel_prizes_dataset.show(1)
# countries_dataset.show(1)

+-----------+----+--------------------+--------------------+-----------+---------+-------------+--------------------+----------+----------+-------------+----+-----------------+-----------------+--------------------+----------+----------+-------------+
|laureate_id|year|               prize|          motivation|prize_share| category|laureate_type|           full_name|birth_date|birth_city|birth_country| sex|organization_name|organization_city|organization_country|death_date|death_city|death_country|
+-----------+----+--------------------+--------------------+-----------+---------+-------------+--------------------+----------+----------+-------------+----+-----------------+-----------------+--------------------+----------+----------+-------------+
|        160|1901|The Nobel Prize i...|in recognition of...|        1/1|Chemistry|   Individual|Jacobus Henricus ...|1852-08-30| Rotterdam|  Netherlands|Male|Berlin University|           Berlin|             Germany|1911-03-01|    Berlin|     Ge

In [43]:
# Adding partition fields
countries = countries.withColumn("pt_year",F.lit(pt_year)).withColumn("pt_quarter",F.lit(pt_quarter))
cities = cities.withColumn("pt_year",F.lit(pt_year)).withColumn("pt_month",F.lit(pt_month))
nobel_laureates = nobel_laureates.withColumn("pt_year",F.lit(pt_year)).withColumn("pt_month",F.lit(pt_month))
nobel_prizes = nobel_prizes.withColumn("pt_year",F.lit(pt_year)).withColumn("pt_month",F.lit(pt_month))

nobel_prizes_dataset = nobel_prizes_dataset.withColumn("pt_year",F.lit(pt_year)).withColumn("pt_month",F.lit(pt_month))
countries_dataset = countries_dataset.withColumn("pt_year",F.lit(pt_year)).withColumn("pt_quarter",F.lit(pt_quarter))

countries.write.format("orc") \
    .mode('overwrite') \
    .option("compression","snappy") \
    .partitionBy("pt_year","pt_quarter") \
    .saveAsTable("countries")
cities.write.format("orc") \
    .mode('overwrite') \
    .option("compression","snappy") \
    .partitionBy("pt_year","pt_month") \
    .saveAsTable("cities")
nobel_prizes.write.format("orc") \
    .mode('overwrite') \
    .option("compression","snappy") \
    .partitionBy("pt_year","pt_month") \
    .saveAsTable("nobel_prizes")
nobel_laureates.write.format("orc") \
    .mode('overwrite') \
    .option("compression","snappy") \
    .partitionBy("pt_year","pt_month") \
    .saveAsTable("nobel_laureates")
nobel_prizes_dataset.write.format("orc") \
    .mode('overwrite') \
    .option("compression","snappy") \
    .partitionBy("pt_year","pt_month") \
    .saveAsTable("nobel_prizes_dataset")
countries_dataset.write.format("orc") \
    .mode('overwrite') \
    .option("compression","snappy") \
    .partitionBy("pt_year","pt_quarter") \
    .saveAsTable("countries_dataset")