In [133]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

import  pyspark.sql. functions as F
import pyspark.sql.types as T


spark = SparkSession.builder.appName("iFood Case").master("local[*]").getOrCreate()

In [162]:
BASE_PATH = 'D:/Downloads/IFood/ifood-case'
DATA_RAW_PATH = BASE_PATH + '/data/raw/'
DATA_PROCESSED_PATH = BASE_PATH + '/data/processed/'

In [38]:
df_offers = spark.read.json(DATA_RAW_PATH + 'offers.json')
df_transaction_events = spark.read.json(DATA_RAW_PATH + 'transactions.json')

In [None]:
df_offers_s = df_offers.select([
    F.col("id").alias("offer_id"),
    F.col('offer_type')
])

In [111]:
df_transaction = (
    df_transaction_events
    .filter(F.col('event') == 'transaction')
    .alias('transaction')
    .select([
        F.col('account_id'), 
        F.col('value.amount').alias('amount'), 
        F.col('time_since_test_start').alias('time')
    ])
)

df_offer_completed = (
    df_transaction_events
    .filter(F.col('event') == 'offer completed')
    .alias('offer_completed')
    .select([
        F.col('account_id'),
        F.col('value.offer_id').alias('offer_id'),
        F.col('value.reward').alias('reward'),
        F.col('time_since_test_start').alias('time'),
    ])
)

df_offer_transactions = (
    df_transaction
    .join(
        df_offer_completed,
        on=['account_id', 'time',],
        how='left'
    )
    .withColumn('event', F.when(
        F.col('offer_id').isNotNull(), F.lit('offer completed')).otherwise(F.lit('transaction'))
    )
    .select([
        F.col('account_id'),
        F.col('offer_id'),
        F.col('time'),
        F.col('event'),
        F.col('amount'),
        F.col('reward')
    ])
    .fillna(0, subset=['amount', 'reward'])
)

df_offer_received = (
    df_transaction_events
    .filter(F.col('event') == 'offer received')
    .alias('offer_received')
    .select([
        F.col('account_id'),
        F.col('value.offer id').alias('offer_id'),
        F.col('time_since_test_start').alias('time'),
        F.col('event'),
        F.lit(0).alias('amount'),
        F.lit(0).alias('reward')
    ])
)

df_offer_viewed = (
    df_transaction_events
    .filter(F.col('event') == 'offer viewed')
    .alias('offer_viewed')
    .select([
        F.col('account_id'),
        F.col('value.offer id').alias('offer_id'),
        F.col('time_since_test_start').alias('time'),
        F.col('event'),
        F.lit(0).alias('amount'),
        F.lit(0).alias('reward')
    ])
)

In [135]:
df_timeline = (
    df_offer_transactions
    .union(df_offer_received)
    .union(df_offer_viewed)
    .orderBy(['account_id', 'time'])
    .join(
        df_offers_s,
        on='offer_id',
        how='left'
    )
    .withColumn('transactions_count', 
        (F.col('event') == 'transaction').cast(T.IntegerType())
    )
    .withColumn('received_bogo_count', 
        ((F.col('event') == 'offer received') & 
         (F.col('offer_type') == 'bogo')).cast(T.IntegerType())
    )
    .withColumn('received_discount_count', 
        ((F.col('event') == 'offer received') & 
         (F.col('offer_type') == 'discount')).cast(T.IntegerType())
    )
    .withColumn('received_info_count', 
        ((F.col('event') == 'offer received') & 
         (F.col('offer_type') == 'informational')).cast(T.IntegerType())
    )
    .withColumn('viewed_bogo_count', 
        ((F.col('event') == 'offer viewed') & 
         (F.col('offer_type') == 'bogo')).cast(T.IntegerType())
    )
    .withColumn('viewed_discount_count', 
        ((F.col('event') == 'offer viewed') & 
         (F.col('offer_type') == 'discount')).cast(T.IntegerType())
    )
    .withColumn('viewed_info_count', 
        ((F.col('event') == 'offer viewed') & 
         (F.col('offer_type') == 'informational')).cast(T.IntegerType())
    )
    .withColumn('completed_bogo_count', 
        ((F.col('event') == 'offer completed') & 
         (F.col('offer_type') == 'bogo')).cast(T.IntegerType())
    )
    .withColumn('completed_discount_count', 
        ((F.col('event') == 'offer completed') & 
         (F.col('offer_type') == 'discount')).cast(T.IntegerType())
    )
    .withColumn('completed_info_count', 
        ((F.col('event') == 'offer completed') & 
         (F.col('offer_type') == 'informational')).cast(T.IntegerType())
    )
)

In [158]:
w = Window.partitionBy('account_id').orderBy('time').rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_timeline_g = (
    df_timeline
    .groupBy(['account_id', 'time']).sum()
    .withColumn('total_amount', F.round(F.sum('sum(amount)').over(w), 2))
    .withColumn('total_reward', F.round(F.sum('sum(reward)').over(w), 2))
    .withColumn('total_transactions', F.sum('sum(transactions_count)').over(w))
    .withColumn('total_received_bogo', F.sum('sum(received_bogo_count)').over(w))
    .withColumn('total_received_discount', F.sum('sum(received_discount_count)').over(w))
    .withColumn('total_received_info', F.sum('sum(received_info_count)').over(w))
    .withColumn('total_viewed_bogo', F.sum('sum(viewed_bogo_count)').over(w))
    .withColumn('total_viewed_discount', F.sum('sum(viewed_discount_count)').over(w))
    .withColumn('total_viewed_info', F.sum('sum(viewed_info_count)').over(w))
    .withColumn('total_completed_bogo', F.sum('sum(completed_bogo_count)').over(w))
    .withColumn('total_completed_discount', F.sum('sum(completed_discount_count)').over(w))
    .withColumn('total_completed_info', F.sum('sum(completed_info_count)').over(w))
)

drop_agg_cols = [col for col in df_timeline_g.columns if col.startswith('sum(')]
df_timeline_g = df_timeline_g.drop(*drop_agg_cols)

In [167]:
df_timeline_g.write.mode('overwrite').parquet(DATA_PROCESSED_PATH + '/transaction_timeline.parquet')

Py4JJavaError: An error occurred while calling o2791.parquet.
: java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:735)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:270)
	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:286)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:788)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:356)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:188)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:269)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:420)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:392)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:547)
	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:568)
	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:591)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:688)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.conf.Configuration.getTimeDurationHelper(Configuration.java:1907)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1867)
	at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
	at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
	at org.apache.hadoop.util.ShutdownHookManager$HookEntry.<init>(ShutdownHookManager.java:207)
	at org.apache.hadoop.util.ShutdownHookManager.addShutdownHook(ShutdownHookManager.java:304)
	at org.apache.spark.util.SparkShutdownHookManager.install(ShutdownHookManager.scala:181)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks$lzycompute(ShutdownHookManager.scala:50)
	at org.apache.spark.util.ShutdownHookManager$.shutdownHooks(ShutdownHookManager.scala:48)
	at org.apache.spark.util.ShutdownHookManager$.addShutdownHook(ShutdownHookManager.scala:153)
	at org.apache.spark.util.ShutdownHookManager$.<init>(ShutdownHookManager.scala:58)
	at org.apache.spark.util.ShutdownHookManager$.<clinit>(ShutdownHookManager.scala)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:242)
	at org.apache.spark.util.SparkFileUtils.createTempDir(SparkFileUtils.scala:103)
	at org.apache.spark.util.SparkFileUtils.createTempDir$(SparkFileUtils.scala:102)
	at org.apache.spark.util.Utils$.createTempDir(Utils.scala:94)
	at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:377)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:969)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:199)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:222)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1125)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1134)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:467)
	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:438)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:515)
	... 25 more
