In [93]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
conf = SparkConf()
conf.setMaster('spark://13.92.116.204:7077')
conf.setAppName('spark-read')

In [94]:
sc = pyspark.SparkContext(conf=conf)

In [None]:
indata_creds = sqlContext.read.format('com.databricks.spark.csv').option('delimiter', '\t').load('s3://stock-data-test/stock/*csv')

In [95]:
from pyspark.sql import SQLContext

In [96]:
sqlContext = SQLContext(sc)

In [97]:
19/08/22 16:24:27 WARN FileStreamSink: Error while looking for metadata directory.

In [98]:
df

DataFrame[MATERIAL: string, PLNT: string, BATCH: string, UNRESTRICTED: string, TRANSIT/TRANSF.: string, IN QUALITY INSP.: string, RESTRICTED-USE: string, BLOCKED: string, RETURNS: string, STATUS_DATE: string]

In [99]:
df.columns

['MATERIAL',
 'PLNT',
 'BATCH',
 'UNRESTRICTED',
 'TRANSIT/TRANSF.',
 'IN QUALITY INSP.',
 'RESTRICTED-USE',
 'BLOCKED',
 'RETURNS',
 'STATUS_DATE']

In [100]:
df = df.withColumnRenamed('UNRESTRICTED',"st1").withColumnRenamed("TRANSIT/TRANSF.","st2").withColumnRenamed("MATERIAL",'sku_code').withColumnRenamed("PLNT","sc_node_code").withColumnRenamed("STATUS_DATE","status_date").withColumnRenamed("BATCH","batch")

In [101]:
from pyspark.sql.types import DoubleType

In [102]:
df = df.withColumnRenamed('IN QUALITY INSP.',"IN_QUALITY_INSP")

In [103]:
df

DataFrame[sku_code: string, sc_node_code: string, batch: string, st1: string, st2: string, IN_QUALITY_INSP: string, RESTRICTED-USE: string, BLOCKED: string, RETURNS: string, status_date: string]

In [104]:
df.columns

['sku_code',
 'sc_node_code',
 'batch',
 'st1',
 'st2',
 'IN_QUALITY_INSP',
 'RESTRICTED-USE',
 'BLOCKED',
 'RETURNS',
 'status_date']

In [105]:
df = df.withColumn('st3', df['IN_QUALITY_INSP'] + df['RESTRICTED-USE'] + df['BLOCKED'] + df['RETURNS'])

In [106]:
cols_to_drop = ['IN_QUALITY_INSP','RESTRICTED-USE','BLOCKED','RETURNS']
df = df.drop(*cols_to_drop)

In [107]:
df

DataFrame[sku_code: string, sc_node_code: string, batch: string, st1: string, st2: string, status_date: string, st3: double]

In [108]:
df = df.withColumn("st1", df["st1"].cast(DoubleType()))

In [109]:
df = df.withColumn("st2", df["st2"].cast(DoubleType()))

In [110]:
df

DataFrame[sku_code: string, sc_node_code: string, batch: string, st1: double, st2: double, status_date: string, st3: double]

In [111]:
df = df.selectExpr("status_date","sku_code","sc_node_code","batch",
              "stack(3, 'st1', st1, 'st2', st2, 'st3' ,st3) as (stock_type, stock_qty)")

In [112]:
df

DataFrame[status_date: string, sku_code: string, sc_node_code: string, batch: string, stock_type: string, stock_qty: double]

In [113]:
df.show()

+-----------+-------------+------------+----------+----------+---------+
|status_date|     sku_code|sc_node_code|     batch|stock_type|stock_qty|
+-----------+-------------+------------+----------+----------+---------+
| 2019-06-22|RL10J0AS10AH1|        1002|LIAW      |       st1|   2136.0|
| 2019-06-22|RL10J0AS10AH1|        1002|LIAW      |       st2|      0.0|
| 2019-06-22|RL10J0AS10AH1|        1002|LIAW      |       st3|      0.0|
| 2019-06-22|RL10J0AS10AH1|        AP01|LIAW      |       st1|      4.0|
| 2019-06-22|RL10J0AS10AH1|        AP01|LIAW      |       st2|      0.0|
| 2019-06-22|RL10J0AS10AH1|        AP01|LIAW      |       st3|      0.0|
| 2019-06-22|RL10J0AS10AH1|        AP02|LIAW      |       st1|      2.0|
| 2019-06-22|RL10J0AS10AH1|        AP02|LIAW      |       st2|      0.0|
| 2019-06-22|RL10J0AS10AH1|        AP02|LIAW      |       st3|      0.0|
| 2019-06-22|RL10J0AS10AH1|        AP05|LIAW      |       st1|      2.0|
| 2019-06-22|RL10J0AS10AH1|        AP05|LIAW      |

In [114]:
forecast_df = sqlContext.read.format("csv").option("header", "true").load("../Data/forecast_june.csv")

In [115]:
calendar_df = sqlContext.read.format("csv").option("header", "true").load("calendar.csv")

In [116]:
calendar_df = calendar_df[['for_date','for_month','for_year','days_in_month']]

In [117]:
calendar_df

DataFrame[for_date: string, for_month: string, for_year: string, days_in_month: string]

In [118]:
products_df = forecast_df[['sku_code','sc_node_code','sales_channel_code','for_month','for_year']].drop_duplicates()

In [119]:
products_df

DataFrame[sku_code: string, sc_node_code: string, sales_channel_code: string, for_month: string, for_year: string]

In [120]:
products_df = calendar_df.join(products_df,
         on=['for_month', 'for_year'],how='left_outer')

In [121]:
products_df

DataFrame[for_month: string, for_year: string, for_date: string, days_in_month: string, sku_code: string, sc_node_code: string, sales_channel_code: string]

In [122]:
products_df = products_df.join(forecast_df,
         on=['for_month', 'for_year', 'sku_code', 'sc_node_code','sales_channel_code'],
         how='left')

In [125]:
products_df = products_df.withColumn('forecast_dly', products_df['forecast_qty']/products_df['days_in_month'])

In [126]:
forecast_df.show()

+----------+---------+--------+--------+-------+-------------+------------+------------------+------------+--------------+
|source_tag|for_month|for_year|in_month|in_year|     sku_code|sc_node_code|sales_channel_code|forecast_qty|forecast_value|
+----------+---------+--------+--------+-------+-------------+------------+------------------+------------+--------------+
|         1|        6|    2019|       6|   2019|RL10J0AS10AH1|        1002|               REP|        1946|       4378500|
|         1|        6|    2019|       6|   2019|RL10J0AS10AH1|        AP01|               REP|         300|        675000|
|         1|        6|    2019|       6|   2019|RL10J0AS10AH1|        AP02|               REP|          15|         33750|
|         1|        6|    2019|       6|   2019|RL10J0AS10AH1|        AP03|               REP|          30|         67500|
|         1|        6|    2019|       6|   2019|RL10J0AS10AH1|        AP04|               REP|         200|        450000|
|         1|    

In [12]:
dbDetails = {
    "driver": "postgresql",
    "username": "postgres",
    "password": "password",
    "host": "localhost",
    "port": "5432",
    "database": "alpha_haldiram"
}

In [18]:

"""
calendar_df = sqlContext.read \
    .format("jdbc") \
    .option("url",'jdbc:postgresql://localhost:5432/alpha_emami') \
    .option("dbtable", "bi.calendar_master") \
    .load()
"""


'\ncalendar_df = sqlContext.read     .format("jdbc")     .option("url",\'jdbc:postgresql://localhost:5432/alpha_emami\')     .option("dbtable", "bi.calendar_master")     .load()\n'

In [127]:
products_df.show()

+---------+--------+-------------+------------+------------------+----------+-------------+----------+--------+-------+------------+--------------+------------+
|for_month|for_year|     sku_code|sc_node_code|sales_channel_code|  for_date|days_in_month|source_tag|in_month|in_year|forecast_qty|forecast_value|forecast_dly|
+---------+--------+-------------+------------+------------------+----------+-------------+----------+--------+-------+------------+--------------+------------+
|        6|    2019|RYQHJ0TUB0AE1|        DL04|               REP|01-06-2019|           30|         1|       6|   2019|           0|             0|         0.0|
|        6|    2019|RYN5D0TUB0AE1|        DL01|               REP|01-06-2019|           30|         1|       6|   2019|           0|             0|         0.0|
|        6|    2019|RYN4Q0TUB0AP1|        PY01|               REP|01-06-2019|           30|         1|       6|   2019|           0|             0|         0.0|
|        6|    2019|RYN4Q0TUB0AP1|

In [128]:
df = df.withColumnRenamed('status_date',"for_date")

In [129]:
final_df = products_df.join(df,
                 on=['sku_code', 'sc_node_code','for_date'],
                 how='left')

In [130]:
final_df = final_df.fillna(0)

In [88]:
%%time
final_df.write.format('csv').save('final_stock_secondary.csv')

Wall time: 45.7 s


In [131]:
final_df.show()

+-------------+------------+----------+---------+--------+------------------+-------------+----------+--------+-------+------------+--------------+-------------------+-----+----------+---------+
|     sku_code|sc_node_code|  for_date|for_month|for_year|sales_channel_code|days_in_month|source_tag|in_month|in_year|forecast_qty|forecast_value|       forecast_dly|batch|stock_type|stock_qty|
+-------------+------------+----------+---------+--------+------------------+-------------+----------+--------+-------+------------+--------------+-------------------+-----+----------+---------+
|RL10J0AS10AH1|        AP07|28-06-2019|        6|    2019|               REP|           30|         1|       6|   2019|          30|         67500|                1.0| null|      null|      0.0|
|RL10J0AS10AH1|        BR03|05-06-2019|        6|    2019|               REP|           30|         1|       6|   2019|           0|             0|                0.0| null|      null|      0.0|
|RL10J0AS10AH1|        DL

In [132]:
%%time
final_df.write.format('com.databricks.spark.csv').save('final_stock_secondary_f.csv')

Wall time: 1min 7s


In [133]:
sc.stop()