### Inicio de la sesion de Spark

Al no estar en un **cluster EMR**, hay que utilizar pyspark de manera local en el notebook de SageMaker. 
Por ello se tiene que ejecutar el siguiente script:


In [2]:
import os
import boto3

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

# See the SageMaker Spark Github to learn how to connect to EMR from a notebook instance
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local[*]").getOrCreate()
    
spark

# Hadoop Configuration
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 's3.us-east-1.amazonaws.com')

### Librerias para manejo de Dataframes

In [7]:
import pyspark.sql.functions as f

### Lectura de archivos parquet

Considerar que el role con el que se creo el notebook de SageMaker es *arn:aws:iam::639556434474:role/BigData-SageMakerCrossAccountS3-Role*, solo con ese role se podra leer los archivos en QAS

In [35]:
df = spark.read\
    .parquet('s3a://belc-bigdata-functional-dlk-qas/analitico/GlueDatabase/functional-offline/dwh_fstaebecam/')\
    .filter((f.col('codpais') == 'CR') & (f.col('aniocampana') == '201918'))

### Consultoras activas que pasaron pedido

In [37]:
df.filter(f.col('flagpasopedido')==1)\
    .groupBy('codpais','aniocampana')\
    .agg(f.count('codebelista')).show()

+-------+-----------+------------------+
|codpais|aniocampana|count(codebelista)|
+-------+-----------+------------------+
|     CR|     201918|             26744|
+-------+-----------+------------------+



### Primera y Ultima campaña de las consultoras como Activas

In [19]:
df_ebelista = spark.read\
    .parquet('s3a://belc-bigdata-functional-dlk-qas/analitico/GlueDatabase/functional-offline/dwh_fstaebecam/')\
    .filter((f.col('codpais') == 'PA'))

In [34]:
df_ebelista.filter(f.col('flagactiva')==1)\
    .groupBy('codpais','codebelista')\
    .agg(f.min(f.col('aniocampana')),f.max(f.col('aniocampana')))\
    .show()

+-------+-----------+----------------+----------------+
|codpais|codebelista|min(aniocampana)|max(aniocampana)|
+-------+-----------+----------------+----------------+
|     PA|  200965116|          201612|          201704|
|     PA|  200609689|          201501|          201618|
|     PA|  200939336|          201608|          201710|
|     PA|  200963512|          201611|          201901|
|     PA|  200874137|          201517|          201714|
|     PA|  200847441|          201513|          201713|
|     PA|  200648031|          201501|          202008|
|     PA|  200903811|          201604|          201701|
|     PA|  200446933|          201501|          201711|
|     PA|  200627768|          201501|          201711|
|     PA|  200829700|          201511|          201711|
|     PA|  200902939|          201605|          202008|
|     PA|  200706889|          201501|          201910|
|     PA|  200803221|          201507|          202004|
|     PA|  200900911|          201603|          

### Suma total del pedido de la consultora XXXXXX en la campaña 202001

In [17]:
df_fvtaproebecam = spark.read\
    .parquet('s3a://belc-bigdata-functional-dlk-qas/analitico/GlueDatabase/functional-offline/dwh_fvtaproebecam/')\
    .filter((f.col('codpais') == 'PA') & (f.col('aniocampana') == '202001'))

In [39]:
df_fvtaproebecam.filter((f.col('aniocampana') == f.col('aniocampanaref')))\
    .groupBy('codpais','aniocampana','codebelista')\
    .agg(f.sum(f.col('realvtamnneto')))\
    .show()

+-------+-----------+-----------+------------------+
|codpais|aniocampana|codebelista|sum(realvtamnneto)|
+-------+-----------+-----------+------------------+
|     PA|     202001|  201272548|          96.64000|
|     PA|     202001|  201349702|         108.98000|
|     PA|     202001|  201360242|         338.39000|
|     PA|     202001|  201351014|         105.70000|
|     PA|     202001|  201340349|         103.21000|
|     PA|     202001|  200578661|         114.96000|
|     PA|     202001|  201301637|         102.91000|
|     PA|     202001|  201302145|          93.27000|
|     PA|     202001|  201084547|         125.16000|
|     PA|     202001|  201261651|         129.41000|
|     PA|     202001|  201294134|          88.58000|
|     PA|     202001|  201375118|          81.09000|
|     PA|     202001|  200960718|         195.46000|
|     PA|     202001|  201348293|         111.59000|
|     PA|     202001|  201269466|          95.41000|
|     PA|     202001|  201375002|         163.