##Equivalencias SAS - pyspark

Ejemplos de operaciones con datasets realizados en __SAS__ y su equivalencia en __pyspark__

###Lectura de fichero

Vamos a trabajar con el siguiente fichero. CUSTOMER_CHURN_TELCO. Mostramos un extracto:  
 
    customerID;gender;tenure;MultipleLines;MonthlyCharges;StreamingTV;DeviceProtection;Contract;Churn
    7590-VHVEG;Female;1;No phone service;29.85;No;No;Month-to-month;No
    5575-GNVDE;Male;34;No;56.95;No;Yes;One year;No
    3668-QPYBK;Male;2;No;53.85;No;No;Month-to-month;Yes
    7795-CFOCW;Male;45;No phone service;;No;Yes;One year;No
    9237-HQITU;Female;2;No;70.7;No;No;Month-to-month;Yes

Explicación de los campos:
* customerid: identificador de clientes  
* gender: género del cliente (male/female)  
* tenure: antigüedad del cliente en meses  
* MultipleLines: indica si cliente tiene varias líneas (Yes/No)  
* MonthlyCharges: cargos mensuales del cliente (factura)  
* StreamingTV: indica si el cliente tiene contratados servicios de TV (Yes/No)  
* DeviceProtection: indica si el cliente tiene un servicio de seguro (Yes/No)  
* Contract: tipo de contrato, duración (Two year, One year, Month-yo_month)  
* Churn: indica si el cliente ha causado baja (Yes/No)

El paso de __lectura en SAS__ es el siguiente:  
 
    DATA WORK.CUSTOMER1;
    LENGTH   customerID    $10  gender  $6   tenure  8
        MultipleLines  $20 MonthlyCharges  8 StreamingTV $20  
		DeviceProtection $20 Contract  $20 Churn $3 ;
    INFILE '/home/ficheros/CUSTOMER_CHURN_TELCO.txt'
        FIRSTOBS=2 DLM=';' DSD ;
    INPUT
        customerID  gender tenure MultipleLines MonthlyCharges   
        StreamingTV  DeviceProtection Contract Churn  ;
     RUN;

Obtenemos la equivalencia en __pyspark__

In [0]:
%fs ls dbfs:/FileStore/tables/

path,name,size
dbfs:/FileStore/tables/CUSTOMER_CHURN_TELCO.txt,CUSTOMER_CHURN_TELCO.txt,432871
dbfs:/FileStore/tables/SALES.csv,SALES.csv,60197
dbfs:/FileStore/tables/appl_stock.csv,appl_stock.csv,143130
dbfs:/FileStore/tables/people.csv,people.csv,34
dbfs:/FileStore/tables/ventas.txt,ventas.txt,50
dbfs:/FileStore/tables/ventas_ciudad.txt,ventas_ciudad.txt,57
dbfs:/FileStore/tables/ventas_ciudad2.txt,ventas_ciudad2.txt,92


Método __read__

In [0]:
df_customer1 = spark.read \
  .format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .option("delimiter", ";") \
  .load("dbfs:/FileStore/tables/CUSTOMER_CHURN_TELCO.txt")

In [0]:
df_customer1.show()

###Filtros

Realizamos filtrados sobre el dataset leído

El paso de __filtro en SAS__ es el siguiente. Realizado con __PASO DATA__:  
 
    DATA CUST_FIL;
    SET CUSTOMER1;
    WHERE Churn = 'Yes';
    RUN;
     
Realizado con __PROC SQL__:  
 
    PROC SQL;
    CREATE TABLE CUST_FIL AS
    (SELECT * FROM CUSTOMER1 WHERE Churn = 'Yes');
    QUIT;

Obtenemos la equivalencia en __pyspark__ empleano el método __where__

In [0]:
df_cust_fil = df_customer1.where("Churn == 'Yes'")
df_cust_fil.show(5)

En __pyspark__ se puede realizar también el filtrado mediante __SQL__  
Primero creamos la tabla con el método createOrReplaceTempView y después la consultamos con el método spark.sql

In [0]:
df_customer1.createOrReplaceTempView("t_customer1")

In [0]:
df_cust_fil = spark.sql("SELECT * FROM t_customer1 where Churn = 'Yes'")
df_cust_fil.show(5)

###Proyección  
#### Selección de campos

El paso de __proyección en SAS__ es el siguiente. Realizado con __KEEP__ que indica los campos a mantener:  
 
    DATA CUST_SEL (KEEP= customerID  gender tenure);
    SET CUSTOMER1;
    RUN;

Otra opción es la sentencia __DROP__ que indica los campos a eliminar:  
 
    DATA CUST_SEL (DROP= MultipleLines MonthlyCharges   
        	StreamingTV  DeviceProtection Contract Churn);
    SET CUSTOMER1;
    RUN;

Realizado con __PROC SQL__:  
 
    PROC SQL;
    CREATE TABLE CUST_SEL AS
    (SELECT customerID,  gender, tenure FROM CUSTOMER1 );
    QUIT;

En __pyspark__ empleamos el método __select__

In [0]:
df_cust_sel = df_customer1.select("customerID", "gender", "tenure")
df_cust_sel.show(5)

Otra opción es el método __drop__

In [0]:
df_cust_sel = df_customer1.drop("MultipleLines","MonthlyCharges", "StreamingTV", "DeviceProtection", "Contract", "Churn")
df_cust_sel.show(5)

Mediante __SQL__

###Columnas calculadas

En __SAS__ podemos calcular campos en un __PASO DATA__:  
 
    DATA CUSTOMER2;
    SET CUSTOMER1;
    TENURE_YEARS = TENURE/12;
    RUN;

O bien con __PROC SQL__:  
 
    PROC SQL;
    CREATE TABLE CUSTOMER2 AS
    (SELECT t.*, t.tenure/12 as tenure_years 
    FROM CUSTOMER1 t );
    QUIT;

En __pyspark__ empleamos las sql functions y el método withColumnn

In [0]:
from pyspark.sql.functions import col
df_customer2 = df_customer1.withColumn("tenure_years", col("tenure")/12 )
df_customer2.show(5)

Mediante __SQL__

In [0]:
df_customer2 = spark.sql("SELECT *, tenure/12 as tenure_years FROM t_customer1 ")
df_customer2.show(5)

###Ordenación

En __SAS__ realizamos la ordenación con un procedimiento __PROC SORT__:  
 
    PROC SORT DATA=CUSTOMER1;
    by customerID;
    run;

En __PySpark__ empleamos método __sort__

In [0]:
df_customer1.sort("customerID").show(5)

### Concatenación. Unión de datasets por filas (operación añadir, concatenar)

En __SAS__ podemos realizar la concatenación con un  __PASO DATA__:  

    DATA CUST_TOT;
    SET CUSTOMER1 CUST_FIL;
    RUN;

Otra opción es el procedimiento __PROC APPEND__:

    PROC APPEND BASE=CUSTOMER1 NEW=CUST_FIL;
    RUN;

Paso de concatenación en __PySpark__ con método __union__

In [0]:
df_cust_tot.count()

In [0]:
df_cust_tot = df_customer1.union(df_cust_fil)
df_cust_tot.show(5)

In [0]:
df_cust_tot.count()

### Agrupaciones

En __SAS__ podemos realizar la agrupación con un  __PROC SQL__:  

    PROC SQL;
    CREATE TABLE CUST_AG1 AS
    (SELECT CHURN, COUNT(customerID) as numcust, avg(tenure) as avg_ten, 
	sum(MonthlyCharges) as sum_mc
    from CUSTOMER1
    GROUP BY CHURN);
    QUIT;

Otra opción es el procedimiento __PROC MEANS__:

    PROC MEANS
    PROC MEANS DATA = CUSTOMER1 NWAY;
    CLASS Churn;
    VAR tenure MonthlyCharges;
    output out=CUST_AG(drop=_type_ _freq_)
      sum(MonthlyCharges) = sum_mc 
      mean(tenure) = avg_ten
      N = num_cust;
    RUN;

En __PySpark__ empleamos método __groupby__ y __agg__

In [0]:
import pyspark.sql.functions as f
df_cust_ag = df_customer1.groupBy("Churn").agg(f.count("CustomerId").alias("num_cust"),f.avg("tenure").alias("avg_tenu"),f.sum("MonthlyCharges").alias("su_mc"))
df_cust_ag.show(5)

### Joins

En __SAS__ podemos realizamos el cruce en un paso data con un  __MERGE__ y una ordenación previa:  

   
    PROC SORT DATA=CUSTOMER1;
    by customerID;
    run;

    PROC SORT DATA=CITY;
    by customerID;
    run;

    DATA CUSTOMER3;
    MERGE CUSTOMER1(in=a) CITY(in=b);
    by customerID;
    if a;
    run;

En __PySpark__ empleamos el método __join__

In [0]:
df_city = spark.createDataFrame([
    ("7590-VHVEG", "Chicago"),
    ("5575-GNVDE", "Boston"),
    ("3668-QPYBK", "New York"),
    ("7795-CFOCW", "Washington")
  ]) \
  .toDF("id_customer", "city")
df_city.show()

In [0]:
df_customer3 = df_customer1.join(df_city, df_customer1["customerID"] == df_city["id_customer"], "outer")
df_customer3.where("customerID in ('7590-VHVEG','5575-GNVDE')").show()

### Escritura a fichero

En __SAS__ podemos realizamos el cruce en un paso data:  

   
    data _null_;
    set CUSTOMER1;
    file "/home/ficheros/fcustomer.csv" dlm = ';';
    put customerID  gender tenure MultipleLines MonthlyCharges   
        StreamingTV  DeviceProtection Contract Churn;
    run;

O bien con el procedimiento __PROC EXPORT__

    proc export data=CUSTOMER1
     outfile="/home/ficheros/fcustomer.csv"  dbms=CSV;
    run;

En __PySpark__ empleamos el método __write__

In [0]:
%fs ls /tmp/test/fcustomer

path,name,size
dbfs:/tmp/test/fcustomer/_SUCCESS,_SUCCESS,0
dbfs:/tmp/test/fcustomer/_committed_2652053655009358260,_committed_2652053655009358260,111
dbfs:/tmp/test/fcustomer/_started_2652053655009358260,_started_2652053655009358260,0
dbfs:/tmp/test/fcustomer/part-00000-tid-2652053655009358260-4c54284e-9ade-40ed-8f13-abbc3f814490-2-1-c000.csv,part-00000-tid-2652053655009358260-4c54284e-9ade-40ed-8f13-abbc3f814490-2-1-c000.csv,426420


In [0]:
df_customer1.write \
  .format("csv") \
  .mode("overwrite") \
  .save("dbfs:/tmp/test/fcustomer")

### Tratamiento de fechas

En __SAS__ podemos crear fechas con la función __DATE()__ y usar funciones de fecha como __MONTH()__ y formatos con __FORMAT__:  

   
    data CUSTOMER4;
    set CUSTOMER1;
    fc_act = DATE();
    month_act = MONTH(fc_act);
    format fc_act yymmdd10.;
    run;

In [0]:
from pyspark.sql.functions import current_date, current_timestamp, month
df_customer4 = df_customer1.withColumn("fc_act", current_date() ).withColumn("month_act", month("fc_act"))
df_customer4.show(5)

__pendientes__: acceso BBDD (ejemplo Hive), fechas, ...

### Acceso a BBDD (Hive)

Se crea una __librería__ para acceder a __Hive__ y mediante la librería se pueden realizar pasos data desde SAS:

   
    libname hivesas hadoop server=hadoop srv user='user'; 

    data hivesas.t_customer;
    set CUSTOMER1;
    run;

Tablas del metastore de Hive

In [0]:
spark.sql("show tables").show()

Escribimos la tabla en Hive

In [0]:
df_customer1.write.saveAsTable("t_customer")