In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, lpad, concat, expr
spark = SparkSession.builder.appName('Egg Price Estimation').getOrCreate()

In [None]:
## Weather Data
df = spark.read.csv('data/train/72G600_weather_raw.csv', header=True)
df = df.withColumnRenamed("平均氣溫(℃)", "avg_temperature")
df = df.withColumnRenamed("最高氣溫(℃)", "max_temperature")
df = df.withColumnRenamed("最低氣溫(℃)", "min_temperature")
df = df.withColumnRenamed("累計雨量(mm)", "rainfall")
df = df.withColumnRenamed("累積日照時數", "sunshine")
df = df.withColumn('year_', split(col("觀測時間"), "/").getItem(0))
df = df.withColumn('month_', split(col("觀測時間"), "/").getItem(1))
df = df.withColumn('date_', split(col("觀測時間"), "/").getItem(2))
df = df.withColumn('month', concat(df.year_, lpad(df.month_, 2, '0')))
df = df.withColumn('date', lpad(df.date_, 2, '0'))
df = df.select(['month', 'date', 'avg_temperature', 'max_temperature', 'min_temperature', 'rainfall', 'sunshine'])
pandas_df = df.toPandas()
pandas_df.to_csv("data/train/72G600_WEATHER.csv", index=False)

In [None]:
## Covid Data
df = spark.read.csv('data/covid_raw.csv', header=True).select(['location', 'date', 'total_cases', 'new_cases', 'total_deaths', 'new_deaths', 'positive_rate'])
df = df.filter(df.location == 'Taiwan')
df = df.withColumn('year_', split(col("date"), "-").getItem(0))
df = df.withColumn('month_', split(col("date"), "-").getItem(1))
df = df.withColumn('date_', split(col("date"), "-").getItem(2))
df = df.withColumn('month', concat(df.year_, lpad(df.month_, 2, '0')))
df = df.withColumn('date', lpad(df.date_, 2, '0'))
df = df.select(['month', 'date', 'total_cases', 'new_cases', 'total_deaths', 'new_deaths', 'positive_rate'])
pandas_df = df.toPandas()
pandas_df.to_csv("data/COVID.csv", index=False)

In [None]:
## Egg Production Data
df = spark.read.csv('data/train/egg_production_raw.csv', header=True).select(['年','月','入中雞雛數','產蛋隻數','均日產蛋箱數','年產蛋數','淘汰隻數','目前換羽隻數'])
df = df.withColumn('year_', df.年)
df = df.withColumn('month_', df.月)
df = df.withColumn('month', concat(df.year_, lpad(df.month_, 2, '0')))
df = df.select(['month', '入中雞雛數', '產蛋隻數', '均日產蛋箱數', '年產蛋數', '淘汰隻數', '目前換羽隻數'])
pandas_df = df.toPandas()
pandas_df.to_csv("data/train/EGG_PRODUCTION.csv", index=False)

In [None]:
## Weed Price Data
df = spark.read.csv('data/train/weed_price_raw.csv', header=True)
df = df.withColumnRenamed("年", "y")
df = df.withColumnRenamed("月", "m")
df = df.withColumnRenamed("玉米粒(進口-產業協會)", "corn_price")
df = df.withColumnRenamed("米糠(國產-產業協會)", "rice_price")
df = df.withColumnRenamed("麩皮(白-產業協會)", "bran_price")
df = df.withColumnRenamed("黃豆粉(進口-產業協會)", "soy_price")
df = df.withColumnRenamed("魚粉(進口貨)", "fish_meal_price")
df = df.withColumnRenamed("蛋雞飼料", "chicken_feed_price")
df = df.withColumn('year_', expr("substring(y, 1, length(y)-1)"))
df = df.withColumn('month_', expr("substring(m, 1, length(m)-1)"))
df = df.withColumn('year_', df.year_.cast('integer'))
df = df.withColumn('year_', df.year_+1911)
df = df.withColumn('month', concat(df.year_, lpad(df.month_, 2, '0')))
df = df.select(['month', 'corn_price', 'rice_price', 'bran_price', 'soy_price', 'fish_meal_price', 'chicken_feed_price'])
pandas_df = df.toPandas()
pandas_df.to_csv("data/train/WEED_PRICE.csv", index=False)