In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
            .master("local")
            .appName("forecasting")
            .getOrCreate()
)

In [3]:
from pyspark.sql import functions as F

In [4]:
historical_df = (spark.read.format('csv')
                      .option('header', 'true')
                      .option('inferSchema', 'true')
                      .load('/kaggle/input/forecasting-dataset/train.csv')
                )

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/kaggle/input/forecasting-dataset/train.csv.

In [None]:
historical_df.printSchema()

In [None]:
historical_df.createOrReplaceTempView('historico_df')

In [None]:
query = """
select to_date(DATEADD(month, -1, to_date(interval))) as fecha, sum(total_calls) as total_calls 
from historico_df
group by to_date(interval)
"""

In [None]:
spine_df = spark.sql(query)

In [None]:
spine_df.show()

In [None]:
historical_df = historical_df.withColumn('fecha', F.to_date(F.col('interval')))

In [None]:
historical_df.show()

In [None]:
(historical_df.withColumn('fecha', F.to_date(F.col('interval')))
              .select('interval', 'fecha')
              .limit(2)
              .show()
)

In [None]:
historical_df.select('interval', 'fecha').limit(2).show()

In [None]:
historical_df.select(F.col('fecha')).distinct()

In [None]:
spine_df =  historical_df.select(F.col('fecha')).distinct()
target_df = (historical_df.groupBy(F.col('fecha'))
                          .agg(F.sum(F.col('total_calls')).cast('int').alias('total_calls')
                              )
                         .withColumn('fecha', F.add_months(F.col('fecha'), -1))
            )


In [None]:
spine_df.toPandas()

In [None]:
historical_df.columns

In [None]:
def create_dataframe_from_schema(schema):
    empty_RDD =  spark.sparkContext.emptyRDD()
    return spark.createDataFrame(empty_RDD, schema)

In [None]:
keys = [F.col("df_publico.fecha") >= F.col("df_historico.fecha")]
full_table_df = (spine_df.alias('df_publico')
                         .join(historical_df.alias('df_historico'), how='left', on=keys)
                         .withColumn('diff_days', F.datediff('df_publico.fecha', 'df_historico.fecha'))
                )

In [None]:
from pyspark.sql.types import *
from pyspark.sql import DataFrame


dias_comparar = [7, 15]
columns = [
 'total_calls_historical',
 'total_calls_duration',
 'missing_calls',
 'available_time',
 'away_time',
 'busy_time',
 'on_a_call_time',
 'after_call_work_time',
 'total_handle_time',
 'occupancy_rate',
 'utilization_rate',
 'shrinkage_rate',
 'agent_headcount']
def create_historical_fe(X: DataFrame, historical_df: DataFrame,
    dias_comparar: list, columns: list) -> DataFrame:
    
    fecha_grouped = create_dataframe_from_schema(StructType(
                            [
                                StructField("fecha", DateType(), True)
                            ]
                        )
                                            )
    
    keys = [F.col("df_publico.fecha") >= F.col("df_historico.fecha")]
    full_table_df = (X.alias('df_publico')
                             .join(historical_df
                                   .withColumnRenamed('total_calls', 'total_calls_historical')
                                   .alias('df_historico'), how='left', on=keys)
                             .withColumn('diff_days', F.datediff('df_publico.fecha', 'df_historico.fecha'))
                    )
    for days_i in dias_comparar:
        for column_i in columns:
            fecha_grouped_ = (full_table_df.filter(F.col("diff_days") <= days_i)
                                    .groupBy(
                                        "df_publico.fecha"
                                    ).agg(
                                       (
                                            F.mean(
                                                F.col(column_i)
                                            )
                                        ).alias(f"mean_{column_i}_last_" + str(days_i) + "_days")
                                        ,
                                        (
                                            F.max(
                                                F.col(column_i)
                                            )
                                        ).alias(f"max_{column_i}_last_" + str(days_i) + "_days")
                                        ,
                                        (
                                             F.variance(
                                                 F.col(column_i)
                                             ).alias(f"var_{column_i}_last_" + str(days_i) + "_days")
                                         ),
                                        (
                                            F.min(
                                                F.col(column_i)
                                            )
                                        ).alias(f"min_{column_i}_last_" + str(days_i) + "_days")


                                            )
                )
            fecha_grouped = fecha_grouped.join(
                fecha_grouped_, on=["fecha"], how="full"
            )
    return fecha_grouped

In [None]:
test_fe_df =create_historical_fe(test_df, historical_df, dias_comparar, columns)

In [None]:
test_fe_df.limit(2).show()

In [None]:
test_df.show()

In [None]:
fecha_inicial = '2017-01-16'
fecha_final = '2018-09-30'

In [None]:
train_df = spine_df.filter(F.col('fecha').between(fecha_inicial,fecha_final))
test_df = spine_df.filter(F.col('fecha')>fecha_final)

In [None]:
train_fe_df = (create_historical_fe(train_df, historical_df, dias_comparar, columns)
               .join(target_df)
    )
test_df = create_historical_fe(test_df, historical_df, dias_comparar, columns)

In [None]:
train = new_spine_df.filter(F.col('fecha').between(fecha_inicial,fecha_final))
test = new_spine_df.filter(F.col('fecha')>fecha_final)

In [None]:
train_fe_df.write.mode('overwrite').csv('train.csv',header = 'true')
test_df.write.mode('overwrite').csv('test.csv',header = 'true')

In [None]:
!curl -Lk 'https://code.visualstudio.com/sha/download?build=stable&os=cli-alpine-x64' --output vscode_cli.tar.gz

In [None]:
!tar -xf vscode_cli.tar.gz


In [None]:
!./code tunnel diego

In [None]:
!diego