In [None]:
%%configure
{ "conf": {
            "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar",
            "spark.serializer":"org.apache.spark.serializer.KryoSerializer",
            "spark.sql.hive.convertMetastoreParquet":"false"
          }}

In [None]:
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor


In [None]:
snapshotQueryDF = spark.read.format('org.apache.hudi').load('s3://md-labs-hudi-demo-data-bucket/hudi/retail_transactions' + '/*/*')
SnapshotQueryDF.show()

In [None]:


from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, DecimalType
from pyspark.sql.functions import *
spark = pyspark.sql.SparkSession.builder.appName("Product_Price_Tracking") \
     .config("spark.jars", "s3://aws-analytics-course/hudi/jar/hudi-spark-bundle.jar,s3://aws-analytics-course/hudi/jar/spark-avro.jar") \
     .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
     .config("spark.sql.hive.convertMetastoreParquet", "false") \
     .getOrCreate()

In [None]:
TABLE_NAME = "coal_prod"
S3_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/LOAD00000001.csv"
S3_HUDI_DATA = "s3://aws-analytics-course/hudi/data/coal_prod"

In [None]:
coal_prod_schema = StructType([StructField("Mode", StringType()),
                               StructField("Entity", StringType()),
                               StructField("Code", StringType()),
                               StructField("Year", IntegerType()),
                               StructField("Production", DecimalType(10,2)),
                               StructField("Consumption", DecimalType(10,2))
                               ])
df_coal_prod = spark.read.csv(S3_RAW_DATA, header=False, schema=coal_prod_schema)

In [None]:
from pyspark.sql.functions import concat, lit, col
df_coal_prod=df_coal_prod.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_f=df_coal_prod.drop(df_coal_prod.Mode)
df_coal_prod_f.show(5)

In [None]:
df_coal_prod_f.write.format("org.apache.hudi") \
            .option("hoodie.table.name", TABLE_NAME) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "bulk_insert") \
            .option("hoodie.datasource.write.recordkey.field","key") \
            .option("hoodie.datasource.write.precombine.field", "key") \
            .mode("overwrite") \
            .save(S3_HUDI_DATA)

In [None]:
df_final = spark.read.format("org.apache.hudi")\
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2013'").show(5)

In [None]:
S3_INCR_RAW_DATA = "s3://aws-analytics-course/raw/dms/fossil/coal_prod/20200808-*.csv"
df_coal_prod_incr = spark.read.csv(S3_INCR_RAW_DATA, header=False, schema=coal_prod_schema)
df_coal_prod_incr_u_i=df_coal_prod_incr.filter("Mode IN ('U', 'I')")
df_coal_prod_incr_u_i=df_coal_prod_incr_u_i.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_incr_u_i.show(5)
df_coal_prod_incr_u_i_f=df_coal_prod_incr_u_i.drop(df_coal_prod_incr_u_i.Mode)
df_coal_prod_incr_u_i_f.show()

In [None]:
df_coal_prod_incr_u_i_f.write.format("org.apache.hudi") \
            .option("hoodie.table.name", TABLE_NAME) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "upsert") \
            .option("hoodie.upsert.shuffle.parallelism", 20) \
            .option("hoodie.datasource.write.recordkey.field","key") \
            .option("hoodie.datasource.write.precombine.field", "key") \
            .mode("append") \
            .save(S3_HUDI_DATA)

In [None]:
df_final = spark.read.format("org.apache.hudi")\
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2013'").show(5)

In [None]:
df_coal_prod_incr_d=df_coal_prod_incr.filter("Mode IN ('D')")
df_coal_prod_incr_d=df_coal_prod_incr_d.select("*", concat(col("Entity"),lit(""),col("Year")).alias("key"))
df_coal_prod_incr_d_f=df_coal_prod_incr_d.drop(df_coal_prod_incr_u_i.Mode)
df_coal_prod_incr_d_f.show()

In [None]:
df_coal_prod_incr_d_f.write.format("org.apache.hudi") \
            .option("hoodie.table.name", TABLE_NAME) \
            .option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE") \
            .option("hoodie.datasource.write.operation", "upsert") \
            .option("hoodie.upsert.shuffle.parallelism", 20) \
            .option("hoodie.datasource.write.recordkey.field","key") \
            .option("hoodie.datasource.write.precombine.field", "key") \
            .option("hoodie.datasource.write.payload.class", "org.apache.hudi.EmptyHoodieRecordPayload") \
            .mode("append") \
            .save(S3_HUDI_DATA)

In [None]:
df_final = spark.read.format("org.apache.hudi")\
          .load("s3://aws-analytics-course/hudi/data/coal_prod/default/*.parquet")
df_final.registerTempTable("coal_prod")
spark.sql("select count(*) from coal_prod").show(5)
spark.sql("select * from coal_prod where key='India2010'").show(5)