# Event Processing - Part II

This is the second part of the event processing pipeline. This code will processes each event type that were partition by the previews code.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as SF
from pyspark.sql.window import Window

In [None]:
spark = SparkSession.builder.appName("EventProcessingPartII").getOrCreate()

In [None]:
spark

## Reading Base

In [None]:
event_type='transaction_update'
year='2021'
month='6'
day='6'

base_input_dir='temp'
base_output_dir='output'
write_mode='overwrite'

In [None]:
df = spark.read.parquet(f'{base_input_dir}/final_event_type={event_type}/year={year}/month={month}/day={day}/*')
df.printSchema()

## Creating New Columns

In [None]:
df = df.withColumn("datetime", SF.col('timestamp').cast('timestamp')) 
df = df.withColumn("year", SF.year('datetime'))
df = df.withColumn("month", SF.month('datetime'))
df = df.withColumn("day", SF.date_format(SF.col("datetime"), "d"))

In [None]:
df.printSchema()

## Remove duplicated events

In [None]:
windowSpec  = Window.partitionBy("event_id").orderBy("datetime")

df = df.withColumn("row_number",SF.row_number().over(windowSpec))\
        .filter('row_number=1')\
        .drop("row_number")\
        .drop('datetime')

In [None]:
df.printSchema()

## Save base

In [None]:
df.write.partitionBy(['year','month','day'])\
    .mode(write_mode).parquet(f"{base_output_dir}/{event_type}")