In [4]:
import locale
import numpy as npa
import pandas as pd

import pytz
from datetime import datetime

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, TimestampType, FloatType, IntegerType
from pyspark.sql.functions import (col, monotonically_increasing_id, regexp_replace,
                                   to_utc_timestamp, from_unixtime, unix_timestamp, round)

spark = SparkSession.builder.appName('ThExam').getOrCreate()

In [6]:
local_timezone = pytz.timezone('Asia/Bangkok')
execute_time = datetime.now(local_timezone)
year = execute_time.year
month = execute_time.month
day = execute_time.day

print(execute_time)
print("year:{} | month={} | day={}".format(year,month,day))

2024-03-03 10:27:08.783819+07:00
year:2024 | month=3 | day=3


In [7]:
TABLE = 'dailycheckins'

In [8]:
path = "../data/{}.csv".format(TABLE)
df = spark.read.csv(path, header=True)

In [9]:
df.sample(False, 0.1, seed=0).write\
.option("header","true")\
.mode('overwrite').csv("../test/table=dailycheckins/year={}/month={}/day={}/".format(year,month,day))

In [10]:
# 
# QA: what is the key for this dataset?
#
df = df.withColumn("id", monotonically_increasing_id())

In [11]:
df.printSchema()

root
 |-- user: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- hours: string (nullable = true)
 |-- project: string (nullable = true)
 |-- id: long (nullable = false)



In [12]:
df.count()

20500

In [13]:
df.describe().toPandas()

24/03/03 10:27:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


Unnamed: 0,summary,user,timestamp,hours,project,id
0,count,20495,20500,20500.0,20500,20500.0
1,mean,,,2.0712743902439037,,10249.5
2,stddev,,,2.0047749846887744,,5917.98459500079
3,min,arya,01/03/2018 12:00 AM,0.0,airflow,0.0
4,max,ygritte,9 января 2018 00:00,9.5,workshops,20499.0


In [14]:
df.sample(False, 0.1, seed=0).show(10, truncate=False)

+--------+----------------------------+-----+-----------+---+
|user    |timestamp                   |hours|project    |id |
+--------+----------------------------+-----+-----------+---+
|ned     |26 сентября 2019 00:00      |4.0  |bizdev     |2  |
|jaime   |2018-12-20 00:00:00 UTC     |1.5  |project-00 |9  |
|catelyn |2018-11-26 14:47:36.0429 UTC|0.08 |security   |16 |
|daenerys|2018-11-26 12:55:30.0416 UTC|8.0  |blogideas  |26 |
|jaime   |2018-11-26 12:11:58.0359 UTC|0.5  |opsandadmin|46 |
|ned     |2018-11-26 12:08:09.0357 UTC|1.5  |transit    |57 |
|jon     |2018-11-26 11:49:04.0334 UTC|2.5  |project-25 |58 |
|sansa   |2018-11-26 11:48:33.0332 UTC|2.5  |project-51 |66 |
|bran    |2018-11-26 00:00:00 UTC     |4.37 |project-31 |79 |
|catelyn |2018-11-23 13:12:16.0135 UTC|1.1  |engineering|92 |
+--------+----------------------------+-----+-----------+---+
only showing top 10 rows



# Clean

In [15]:
def find_missing(df):
    string_columns = ['user', 'project', 'timestamp']
    numeric_columns = ['hours']
    missing_values = {} 

    for index, column in enumerate(df.columns):
        if column in string_columns: 
            missing_count = df.filter(col(column).eqNullSafe(None) | col(column).isNull()).count()
            missing_values.update({column:missing_count})
        if column in numeric_columns: 
            missing_count = df.where(col(column).isin([None,np.nan])).count()
            missing_values.update({column:missing_count})
    return missing_values

In [16]:
# string_columns = ['user', 'project', 'timestamp']
# numeric_columns = ['hours']
# missing_values = {} 

# for index, column in enumerate(df.columns):
#     if column in string_columns: 
#         missing_count = df.filter(col(column).eqNullSafe(None) | col(column).isNull()).count()
#         missing_values.update({column:missing_count})
#     if column in numeric_columns: 
#         missing_count = df.where(col(column).isin([None,np.nan])).count()
#         missing_values.update({column:missing_count})

In [17]:
missing_df = pd.DataFrame.from_dict([find_missing(df)])
missing_df

NameError: name 'np' is not defined

## Casting to make it right

In [18]:
df2 = df.withColumn('id', col('id').cast(IntegerType()))

NameError: name 'IntegerType' is not defined

In [None]:
df2 = df.withColumn('user', col('id').cast(StringType()))

In [531]:
df2 = df2.withColumn('hours', col('hours').cast(FloatType()))

In [533]:
df2 = df2.withColumn('hours', round(col('hours'), 2))

In [None]:
df2 = df2.withColumn('timestamp', col('timestamp').cast(TimestampType()))

In [534]:
df2.sample(False, 0.1, seed=0).show(10, truncate=False)

+--------+----------------------------+-----+-----------+---+
|user    |timestamp                   |hours|project    |id |
+--------+----------------------------+-----+-----------+---+
|ned     |26 сентября 2019 00:00      |4.0  |bizdev     |2  |
|jaime   |2018-12-20 00:00:00 UTC     |1.5  |project-00 |9  |
|catelyn |2018-11-26 14:47:36.0429 UTC|0.08 |security   |16 |
|daenerys|2018-11-26 12:55:30.0416 UTC|8.0  |blogideas  |26 |
|jaime   |2018-11-26 12:11:58.0359 UTC|0.5  |opsandadmin|46 |
|ned     |2018-11-26 12:08:09.0357 UTC|1.5  |transit    |57 |
|jon     |2018-11-26 11:49:04.0334 UTC|2.5  |project-25 |58 |
|sansa   |2018-11-26 11:48:33.0332 UTC|2.5  |project-51 |66 |
|bran    |2018-11-26 00:00:00 UTC     |4.37 |project-31 |79 |
|catelyn |2018-11-23 13:12:16.0135 UTC|1.1  |engineering|92 |
+--------+----------------------------+-----+-----------+---+
only showing top 10 rows



In [535]:
missing_df = pd.DataFrame.from_dict([find_missing(df2)])
missing_df

Unnamed: 0,user,timestamp,hours,project
0,5,0,0,0


In [537]:
df2.filter(col('user').eqNullSafe(None) | col('user').isNull()).show(truncate=False)

+----+------------------------------+-----+----------+-----+
|user|timestamp                     |hours|project   |id   |
+----+------------------------------+-----+----------+-----+
|null|2017-12-27 10:36:14.000121 UTC|4.0  |project-40|15797|
|null|2017-12-27 10:36:14.000121 UTC|3.0  |learning  |15798|
|null|2017-10-12 10:31:44.000227 UTC|2.75 |project-47|17572|
|null|2017-10-12 10:31:44.000227 UTC|4.0  |bizdev    |17573|
|null|2017-10-12 10:31:44.000227 UTC|1.0  |transit   |17574|
+----+------------------------------+-----+----------+-----+



In [542]:
# filter out null rows 
ids = df2.filter(col('timestamp').eqNullSafe(None) | col('timestamp').isNull()).select('id').rdd.flatMap(lambda x: x).collect()
print(len(ids))

2047


24/03/02 01:46:32 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.


In [543]:
df2 = df2.filter(~col("id").isin(ids))
df2.count()

18453

In [544]:
df3 = df.filter(col("id").isin(ids))
df3.count()

2047

In [545]:
df3.sample(False, 0.1, seed=0).show(10, truncate=False)

+------------+--------------------+-----+-----------+---+
|user        |timestamp           |hours|project    |id |
+------------+--------------------+-----+-----------+---+
|jaime       |12/21/2018 12:00 AM |2.0  |project-00 |6  |
|catelyn     |23 ноября 2018 13:12|2.37 |project-40 |91 |
|joffrey     |22 ноября 2018 14:12|2.0  |bizdev     |170|
|sansa       |11/21/2018 12:00 AM |1.5  |learning   |300|
|littlefinger|11/19/2018 12:15 PM |3.0  |opsandadmin|470|
|hound       |16 ноября 2018 00:00|0.5  |events     |628|
|hound       |16 ноября 2018 00:00|0.75 |marketing  |635|
|bran        |11/15/2018 12:00 AM |6.08 |project-66 |709|
|hound       |11/13/2018 12:00 AM |0.5  |blogideas  |820|
|samwell     |12 ноября 2018 00:00|6.5  |project-26 |902|
+------------+--------------------+-----+-----------+---+
only showing top 10 rows



In [546]:
# alian formats 
# 1.09/27/2019 12:00 AM
# 2.dd MMMM yyyy HH:|mm

In [547]:
# unix_timestamp
# from_unixtime
# to_utc_timestamp

In [548]:
df3 = df3.withColumn('timestamp', to_utc_timestamp(from_unixtime(unix_timestamp(col('timestamp'), "MM/dd/yyyy hh:mm a")), 'UTC'))
# for format "MM/dd/yyyy hh:mm a"
df3.count()

In [550]:
missing_df = pd.DataFrame.from_dict([find_missing(df3)])
missing_df

Unnamed: 0,user,timestamp,hours,project
0,0,1022,0,0


In [551]:
df3.filter(col('timestamp').eqNullSafe(None) | col('timestamp').isNull()).count()

1022

In [552]:
ruIDs = df3.filter(col('timestamp').eqNullSafe(None) | col('timestamp').isNull()).select('id').rdd.flatMap(lambda x: x).collect()

24/03/02 01:46:58 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.


In [553]:
df3 = df3.filter(~col("id").isin(ruIDs))
df3.count()

1025

In [554]:
df4 = df.filter(col("id").isin(ruIDs))

In [555]:
df4.sample(False, 0.1, seed=0).show(10, truncate=False)

+-------+---------------------+-----+------------+----+
|user   |timestamp            |hours|project     |id  |
+-------+---------------------+-----+------------+----+
|viserys|26 ноября 2018 12:30 |0.5  |opsandadmin |38  |
|sansa  |23 ноября 2018 00:00 |1.0  |project-51  |140 |
|jeor   |21 ноября 2018 10:14 |1.5  |bizdev      |269 |
|catelyn|19 ноября 2018 13:29 |0.33 |opsandadmin |448 |
|catelyn|12 ноября 2018 11:01 |0.82 |project-38  |876 |
|bronn  |6 ноября 2018 00:00  |1.75 |website     |1326|
|jeor   |6 ноября 2018 00:00  |2.5  |project-30  |1341|
|jon    |31 октября 2018 15:55|4.0  |project-25  |1495|
|stannis|29 октября 2018 00:00|0.82 |project-00  |1749|
|sansa  |24 октября 2018 00:00|0.5  |datastrategy|2025|
+-------+---------------------+-----+------------+----+
only showing top 10 rows



## Cast RU

In [556]:
month_mapping = {
    'января': 'January',
    'февраля': 'February',
    'марта': 'March',
    'апреля': 'April',
    'мая': 'May',
    'июня': 'June',
    'июля': 'July',
    'августа': 'August',
    'сентября': 'September',
    'октября': 'October',
    'ноября': 'November',
    'декабря': 'December'
}
for russian_month, english_month in month_mapping.items():
    df4 = df4.withColumn('timestamp', regexp_replace(col('timestamp'), russian_month, english_month))

In [557]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

df4 = df4.withColumn('timestamp', to_utc_timestamp(from_unixtime(unix_timestamp(col('timestamp'), "dd MMMM y HH:mm")), 'UTC'))

In [559]:
df4.sample(False, 0.1, seed=0).show(10, truncate=False)
df4.count()

+-------+-------------------+-----+------------+----+
|user   |timestamp          |hours|project     |id  |
+-------+-------------------+-----+------------+----+
|viserys|2018-11-26 12:30:00|0.5  |opsandadmin |38  |
|sansa  |2018-11-23 00:00:00|1.0  |project-51  |140 |
|jeor   |2018-11-21 10:14:00|1.5  |bizdev      |269 |
|catelyn|2018-11-19 13:29:00|0.33 |opsandadmin |448 |
|catelyn|2018-11-12 11:01:00|0.82 |project-38  |876 |
|bronn  |2018-11-06 00:00:00|1.75 |website     |1326|
|jeor   |2018-11-06 00:00:00|2.5  |project-30  |1341|
|jon    |2018-10-31 15:55:00|4.0  |project-25  |1495|
|stannis|2018-10-29 00:00:00|0.82 |project-00  |1749|
|sansa  |2018-10-24 00:00:00|0.5  |datastrategy|2025|
+-------+-------------------+-----+------------+----+
only showing top 10 rows



1022

In [560]:
missing_df = pd.DataFrame.from_dict([find_missing(df4)])
missing_df

Unnamed: 0,user,timestamp,hours,project
0,0,0,0,0


In [562]:
final_df = df2.unionByName(df3).unionByName(df4)

In [563]:
final_df.describe().toPandas()

Unnamed: 0,summary,user,hours,project,id
0,count,20495,20500.0,20500,20500.0
1,mean,,2.0712746341463437,,10249.5
2,stddev,,2.00477474819615,,5917.984595000784
3,min,arya,0.0,airflow,0.0
4,max,ygritte,9.5,workshops,20499.0


In [564]:
final_df.orderBy('id').show(10, truncate=False)

+------+-------------------+-----+--------------------+---+
|user  |timestamp          |hours|project             |id |
+------+-------------------+-----+--------------------+---+
|ned   |2019-09-27 07:00:00|8.0  |bizdev              |0  |
|robert|2019-09-27 00:00:00|8.0  |bizdev              |1  |
|ned   |2019-09-26 00:00:00|4.0  |bizdev              |2  |
|ned   |2019-09-26 07:00:00|1.0  |cultureandmanagement|3  |
|ned   |2019-09-26 07:00:00|1.5  |project-00          |4  |
|ned   |2019-09-26 07:00:00|1.0  |project-43          |5  |
|jaime |2018-12-21 00:00:00|2.0  |project-00          |6  |
|jaime |2018-12-21 07:00:00|0.5  |project-47          |7  |
|jaime |2018-12-21 07:00:00|3.5  |project-47          |8  |
|jaime |2018-12-20 07:00:00|1.5  |project-00          |9  |
+------+-------------------+-----+--------------------+---+
only showing top 10 rows



In [570]:
final_df = final_df.orderBy('id').drop('id')
final_df.count()

20500

In [571]:
final_df.printSchema()

root
 |-- user: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- hours: string (nullable = true)
 |-- project: string (nullable = true)



In [572]:
final_df.show(10, truncate=False)

+------+-------------------+-----+--------------------+
|user  |timestamp          |hours|project             |
+------+-------------------+-----+--------------------+
|ned   |2019-09-27 07:00:00|8.0  |bizdev              |
|robert|2019-09-27 00:00:00|8.0  |bizdev              |
|ned   |2019-09-26 00:00:00|4.0  |bizdev              |
|ned   |2019-09-26 07:00:00|1.0  |cultureandmanagement|
|ned   |2019-09-26 07:00:00|1.5  |project-00          |
|ned   |2019-09-26 07:00:00|1.0  |project-43          |
|jaime |2018-12-21 00:00:00|2.0  |project-00          |
|jaime |2018-12-21 07:00:00|0.5  |project-47          |
|jaime |2018-12-21 07:00:00|3.5  |project-47          |
|jaime |2018-12-20 07:00:00|1.5  |project-00          |
+------+-------------------+-----+--------------------+
only showing top 10 rows



In [573]:
final_df = final_df.withColumn('timestamp', date_format(from_utc_timestamp(col('timestamp'), "UTC"), "yyyy-MM-dd HH:mm:ss"))

In [574]:
final_df.write\
.option("header","true")\
.mode('overwrite').csv("../state=transform/table=dailycheckins/year={}/month={}/day={}/".format(year,month,day))

# Production

In [20]:
# schema = StructType([
#     StructField("user",StringType(),True),
#     StructField("timestamp",TimestampType(),True),
#     StructField("hours",FloatType(),True),
#     StructField("project",StringType(),True),
#   ])

In [21]:
# landing_data_path="s3/{buckey}/table=dailycheckins/year=2024/month=3/day=1/dailycheckins.csv"
landing_data_path = "../data/dailycheckins.csv"
df = spark.read.csv(path, header=True)

In [22]:
df = df.withColumn("id", monotonically_increasing_id())

In [23]:
backup_df = df.select('*')

In [24]:
df2 = df.withColumn('user', col('user').cast(StringType()))
df2 = df2.withColumn('hours', round(col('hours').cast(FloatType()), 2))
df2 = df2.withColumn('project', col('project').cast(StringType()))
df2 = df2.withColumn('timestamp', col('timestamp').cast(TimestampType()))

In [25]:
# transform 'timestamp' column 
nullIds = df2.filter(col('timestamp').eqNullSafe(None) | col('timestamp').isNull()).select('id').rdd.flatMap(lambda x: x).collect()
print(len(nullIds))

2047


In [26]:
clean_df = df2.filter(~col("id").isin(nullIds))
clean_df.count()

18453

In [27]:
df3 = df.filter(col("id").isin(nullIds))
df3.count()

2047

In [28]:
format_1 = 'MM/dd/yyyy hh:mm a'
df3 = df3.withColumn('timestamp', to_utc_timestamp(from_unixtime(unix_timestamp(col('timestamp'), format_1)), 'UTC'))
df3.count()

2047

In [29]:
ruIDs = df3.filter(col('timestamp').eqNullSafe(None) | col('timestamp').isNull()).select('id').rdd.flatMap(lambda x: x).collect()
print(len(ruIDs))

1022


In [30]:
clean_df2 = df3.filter(~col("id").isin(ruIDs))
clean_df2.count()

1025

In [31]:
df4 = df.filter(col("id").isin(ruIDs))
df4.count()

1022

In [32]:
def make_datetime_ru(df):
    month_mapping = {
    'января': 'January',
    'февраля': 'February',
    'марта': 'March',
    'апреля': 'April',
    'мая': 'May',
    'июня': 'June',
    'июля': 'July',
    'августа': 'August',
    'сентября': 'September',
    'октября': 'October',
    'ноября': 'November',
    'декабря': 'December'
    }
    for russian_month, english_month in month_mapping.items():
        df = df.withColumn('timestamp', regexp_replace(col('timestamp'), russian_month, english_month))
    return df

In [33]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")

In [34]:
df4 = make_datetime_ru(df4)

In [35]:
df4.sample(False, 0.1, seed=0).show(10, truncate=False)

+-------+----------------------+-----+------------+----+
|user   |timestamp             |hours|project     |id  |
+-------+----------------------+-----+------------+----+
|viserys|26 November 2018 12:30|0.5  |opsandadmin |38  |
|sansa  |23 November 2018 00:00|1.0  |project-51  |140 |
|jeor   |21 November 2018 10:14|1.5  |bizdev      |269 |
|catelyn|19 November 2018 13:29|0.33 |opsandadmin |448 |
|catelyn|12 November 2018 11:01|0.82 |project-38  |876 |
|bronn  |6 November 2018 00:00 |1.75 |website     |1326|
|jeor   |6 November 2018 00:00 |2.5  |project-30  |1341|
|jon    |31 October 2018 15:55 |4.0  |project-25  |1495|
|stannis|29 October 2018 00:00 |0.82 |project-00  |1749|
|sansa  |24 October 2018 00:00 |0.5  |datastrategy|2025|
+-------+----------------------+-----+------------+----+
only showing top 10 rows



In [36]:
format_2 = "dd MMMM y HH:mm"
df4 = df4.withColumn('timestamp', to_utc_timestamp(from_unixtime(unix_timestamp(col('timestamp'), format_2)), 'UTC'))
clean_df3 = df4.select('*')

In [37]:
final_df = clean_df.unionByName(clean_df2).unionByName(clean_df3)

In [38]:
final_df.write\
.option("header","true")\
.mode('overwrite').csv("../state=transform/table=dailycheckins/year={}/month={}/day={}/".format(year,month,day))

# --- to debug --- 

In [55]:
df3.sample(False, 0.1, seed=0).show(10, truncate=False)

+------------+---------+-----+-----------+---+
|user        |timestamp|hours|project    |id |
+------------+---------+-----+-----------+---+
|jaime       |null     |2.0  |project-00 |6  |
|catelyn     |null     |2.37 |project-40 |91 |
|joffrey     |null     |2.0  |bizdev     |170|
|sansa       |null     |1.5  |learning   |300|
|littlefinger|null     |3.0  |opsandadmin|470|
|hound       |null     |0.5  |events     |628|
|hound       |null     |0.75 |marketing  |635|
|bran        |null     |6.08 |project-66 |709|
|hound       |null     |0.5  |blogideas  |820|
|samwell     |null     |6.5  |project-26 |902|
+------------+---------+-----+-----------+---+
only showing top 10 rows

