In [1]:
import sys
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, TimestampType


import pandas as pd
import numpy as np

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.sql.repl.eagerEval.enabled", "true") \
    .appName('test') \
    .getOrCreate()

In [3]:
df = spark.read.parquet('/opt/airflow/datasets/online_payment.parquet', header=True)

In [4]:
df.printSchema()

root
 |-- step: string (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: string (nullable = true)
 |-- newbalanceOrig: string (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: string (nullable = true)
 |-- newbalanceDest: string (nullable = true)
 |-- isFraud: string (nullable = true)
 |-- isFlaggedFraud: string (nullable = true)



In [5]:
# Cast "step" column to IntegerType
df = df.withColumn("step", df["step"].cast(IntegerType()))

# start time (misalnya: '2023-07-01 00:00:00')
start_time = "2023-07-01 00:00:00"

# Convert "start_time" to TimestampType using F.lit()
start_time = F.lit(start_time).cast(TimestampType())

# convert step to timestamp
df = df.withColumn("timestamp", start_time + (F.col("step")) * F.expr("INTERVAL 1 HOUR"))

# create column date from timestamp
df = df.withColumn("date", F.date_format("timestamp", "yyyy-MM-dd"))

In [6]:
df.show()

+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------------------+----------+
|step|    type|   amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|          timestamp|      date|
+----+--------+---------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+-------------------+----------+
|   1| PAYMENT|  9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|      0|             0|2023-07-01 01:00:00|2023-07-01|
|   1| PAYMENT|  1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|      0|             0|2023-07-01 01:00:00|2023-07-01|
|   1|TRANSFER|    181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|2023-07-01 01:00:00|2023-07-01|
|   1|CASH_OUT|    181

In [None]:
df.write.parquet('/opt/airflow/online_payment.parquet', mode='overwrite')