In [11]:
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, col

In [2]:
warehouse_location = abspath('spark-warehouse')

In [3]:
spark = SparkSession.builder\
    .appName('demo')\
    .config('spark.sql.warehouse.dir', warehouse_location)\
    .enableHiveSupport().getOrCreate()

In [4]:
df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('data/consistency.csv')

In [5]:
df.show(5)

+--------------------+-------------+----------+
|             feature|     datetime|match_rate|
+--------------------+-------------+----------+
|fs_statistics__av...|2022-07-11 01|      0.35|
|fs_statistics__av...|2022-07-11 02|      0.68|
|fs_statistics__av...|2022-07-11 03|      0.89|
|fs_statistics__av...|2022-07-11 04|      0.92|
|fs_statistics__av...|2022-07-11 05|      0.99|
+--------------------+-------------+----------+
only showing top 5 rows



In [6]:
df = df.withColumn("datetime",to_timestamp("datetime"))

In [7]:
df.printSchema()

root
 |-- feature: string (nullable = true)
 |-- datetime: timestamp (nullable = true)
 |-- match_rate: double (nullable = true)



In [8]:
from datetime import datetime

data = [
    ('fs_statistics__avg_age', datetime.strptime('2022-07-11 06', '%Y-%m-%d %H'), 0.21),
    ('fs_statistics__avg_salary', datetime.strptime('2022-07-11 06', '%Y-%m-%d %H'), 0.14)
]

new_df = spark.createDataFrame(data=data, schema=df.schema)

df = df.union(new_df)

In [12]:
df = df.orderBy(col('datetime').asc())

In [10]:
# only this session
# df.createOrReplaceTempView('consistency_table')

# across sessions
# df.createGlobalTempView('consistency_table')

df.write.format('parquet').mode('overwrite').saveAsTable('consistency_table')