# Basic program ETL Spark ( load, fix data types, fix duplicity, write and read) for running all steps you need go menu Cell/RunAll

# *

# Starting

# *





## Import librarys and initial Spark 

In [1]:
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import functions as F
from pyspark.sql import Window
import uuid
from datetime import datetime
conf = SparkConf().setAppName("ETL").setMaster("local")
sc = SparkContext(conf=conf)
spark = SQLContext(sc)
logger = spark._jvm.org.apache.log4j
print(sc)
print(spark)

<SparkContext master=local appName=ETL>
<pyspark.sql.context.SQLContext object at 0x7fa8300fe150>


## Method for convert data types from according map

In [2]:
def fix_dtypes(df, list_col_dtypes):

    dtypes = df.dtypes
    for row in dtypes:
        column = row[0]
        _dtypes = list_col_dtypes.get(column)
        if _dtypes is None:
            raise Exception("Column {} type {} doesn't in mapping, please verify.".format(column, _dtypes))

        if (_dtypes in ["double", "integer", "float"]) | (_dtypes.find("decimal") >= 0):
            df = df.withColumn(column, F.when(F.trim(F.col(column)).isNull(), "0")
                               .otherwise(F.trim(F.col(column))))
            df = df.withColumn(column, F.col(column).cast(_dtypes))
        elif _dtypes in ["string", "timestamp", "boolean"]:
            df = df.withColumn(column, F.trim(F.col(column)).cast(_dtypes))

    return df

## Load file CSV that was write in another program ( write_csv.ipynb )

In [3]:
def load_csv_spark(path):
    
    df = spark.read.csv(path, header=True)
    
    return df

## Load mapping that has data types corret, primary key of data frame and column that garantees uniqueless of line. 

In [4]:
def load_mapping():
    
    list_col_dtypes = {"id": "integer",
                       "randow": "string",
                       "update_timestamp": "timestamp",
                       "boolean": "boolean"}
    
    col_name_last_update = "update_timestamp"
    
    primary_key = ["id"]
    
    return [primary_key, col_name_last_update, list_col_dtypes]

## Method for remove duplicity of dataframe

In [5]:
def fix_duplicity(df, list_col_dtypes, primary_key, col_name_last_update):
    
    df = fix_dtypes(df, list_col_dtypes)
    window = Window. \
        partitionBy(primary_key). \
        orderBy(df[col_name_last_update].asc())
    _df = df.withColumn("rank_tmp", F.row_number().over(window))
    _df = _df.filter("rank_tmp=1")
    df = _df.drop("rank_tmp")
    
    return df

## Write file parquet in path

In [6]:
def write_parquet(df, path):
    
    df.write.mode('overwrite').parquet(path)

## Read file parquet in path

In [7]:
def read_parquet(path):
    
    df = spark.read.parquet(path)
    
    return df

## Method process that final code output time execution 

In [8]:
def process(number_rows):
    #Read file in storage
    path_csv = "/home/jovyan/work/artifacts/data/input/users/generated_{}_rows.csv".format(number_rows)

    df = load_csv_spark(path_csv)

    #Load mapping
    data = load_mapping()

    primary_key = data[0]
    col_name_last_update = data[1]
    list_col_dtypes = data[2]

    #Duplicity in file - Fix
    df = fix_duplicity(df, list_col_dtypes, primary_key, col_name_last_update)

    path_parquet = "/home/jovyan/work/artifacts/data/output/spark/"

    #Write parquet
    write_parquet(df, path_parquet)

    #Read parquet
    df = read_parquet(path_parquet)
    
    return df

## Call process and you put the number of line that you write file csv before.

In [9]:
start = datetime.now()
number_rows = 20000000
df = process(number_rows=number_rows)
end = datetime.now()
print(end - start)

0:02:35.333882


# Now we go use the output dataframe for explore some comands

## Is this show data with order by column

In [10]:
start = datetime.now()
df.alias("a").orderBy(F.col("a.id")).show(100, False)
end = datetime.now()
print(end - start)

+---+------------------------------------+--------------------------+-------+
|id |randow                              |update_timestamp          |boolean|
+---+------------------------------------+--------------------------+-------+
|0  |80acc451-3dda-4425-81fb-ed1bb5617d98|2020-06-11 13:41:15.663773|false  |
|1  |2bd2df11-da25-42c2-9ed2-326339260cec|2020-06-11 13:41:15.663795|true   |
|2  |682e558e-767b-41ac-8672-ee14f8614cdc|2020-06-11 13:41:15.66381 |false  |
|3  |1851419a-e4c8-4eb7-a5d5-ac27c6fd96a4|2020-06-11 13:41:15.663823|true   |
|4  |657ee043-fe5d-462d-b946-2d2850fd45df|2020-06-11 13:41:15.663835|false  |
|5  |f6488feb-0408-4d26-a2bc-c763c9b1deee|2020-06-11 13:41:15.663847|true   |
|6  |10d626a0-fbc4-404c-a886-fc209e0a5371|2020-06-11 13:41:15.663858|false  |
|7  |591dd58e-d060-489a-9382-099d174d5c71|2020-06-11 13:41:15.663869|true   |
|8  |af1e40b3-f866-4484-91ba-dc61d918ddd0|2020-06-11 13:41:15.66388 |false  |
|9  |88d4c9ec-3d11-4291-9e58-0690c8ef0dd0|2020-06-11 13:41:15.66

## Is this method show how you transformer DataFrame df in table at memory

In [11]:
start = datetime.now()
df.createOrReplaceTempView("df")
end = datetime.now()
print(end - start)

0:00:00.049670


## Is this script use table created above in memory. You can use common sql

In [12]:
start = datetime.now()
spark.sql("select * from df order by id").show(100,False)
end = datetime.now()
print(end - start)

+---+------------------------------------+--------------------------+-------+
|id |randow                              |update_timestamp          |boolean|
+---+------------------------------------+--------------------------+-------+
|0  |80acc451-3dda-4425-81fb-ed1bb5617d98|2020-06-11 13:41:15.663773|false  |
|1  |2bd2df11-da25-42c2-9ed2-326339260cec|2020-06-11 13:41:15.663795|true   |
|2  |682e558e-767b-41ac-8672-ee14f8614cdc|2020-06-11 13:41:15.66381 |false  |
|3  |1851419a-e4c8-4eb7-a5d5-ac27c6fd96a4|2020-06-11 13:41:15.663823|true   |
|4  |657ee043-fe5d-462d-b946-2d2850fd45df|2020-06-11 13:41:15.663835|false  |
|5  |f6488feb-0408-4d26-a2bc-c763c9b1deee|2020-06-11 13:41:15.663847|true   |
|6  |10d626a0-fbc4-404c-a886-fc209e0a5371|2020-06-11 13:41:15.663858|false  |
|7  |591dd58e-d060-489a-9382-099d174d5c71|2020-06-11 13:41:15.663869|true   |
|8  |af1e40b3-f866-4484-91ba-dc61d918ddd0|2020-06-11 13:41:15.66388 |false  |
|9  |88d4c9ec-3d11-4291-9e58-0690c8ef0dd0|2020-06-11 13:41:15.66

## Is this script makes a count on the dataframe

In [13]:
start = datetime.now()
print("number of lines on the DF {}".format(df.count()))
end = datetime.now()
print(end - start)

number of line on the DF 20000000
0:00:00.593011


## With this command you can look how Spark mount your dataframe. You can compare with Pandas.

In [14]:
%%timeit
path_csv = "/home/jovyan/work/artifacts/data/input/users/generated_{}_rows.csv".format(number_rows)
df = load_csv_spark(path_csv)

176 ms ± 52.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
