In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.getOrCreate()

In [0]:
spark

In [0]:
spark.stop()

1. Una colección de elementos tolerantes a fallo.
2. Puede operar en paralelo.

Caracteristicas principales:
1. Tipo de dato básico que utiliza Apache Spark.
2. Estan particionados en distintos nodos del cluster. Los RDD's van a estar distribuidos por todas las instancias
3. Tolerancia a fallos. Si un fichero se carga, y el nodo falla, este se puede ejecutar en otra instancia.
4. Se crea a partir de ficheros en HDFS (Hadoop File System) / DBFS

In [0]:
#creamos una lista

lista = ['Apache Spark','Curso de Databricks','Bigdata','Azure Cloud']

1. Para aplicaciones de producción, se crea un RDD principalmente usando sistema de almacenamiento externo como un HDFS, S3 HBase, entre otros.
2. Parallelize, es una función en Spark que te permite crear un RDD a partir de una colección de lista.

In [0]:
#Crear un RDD utilizando parallelize
rdd = spark.sparkContext.parallelize(lista)

In [0]:
rdd.collect()


1. Crear un RDD vació usando sparkContext.emptyRDD, solo para tenerlo instanciado. Este RDD al estar vacío, va sin partición.

In [0]:
rddempty = spark.sparkContext.emptyRDD()
rddempty.collect()

In [0]:
df_spark = spark.read.table("ventas_2")

In [0]:
rdd2 = df_spark.rdd
rdd2.collect()

Se utiliza para hacer operaciones más complejas. Por ejemplo: agregar una columna, actualizarla, salida de transformación, contestos, etc

In [0]:
rdd3 = rdd2.map(lambda x: (x,1))
rdd3.collect()

1. reduceByKey() --> Fusión de cada clave para formar una cadena. Similar a un concat.
2. SortByKey() --> Ordenar, es similar a un order by
3. Filter() --> Filtros en un RDD
4. First() --> Devuelve el primer registro
5. Max() --> El maximo registro
6. Reduce() --> Reduce los registros a uno solo, luego con esto puedes contar, sumar, totalizar, etc

In [0]:
#Creamos una lista
data = [("Scala","5000"),("Python","5000"),("Java","30000")]

#Convirtiendo a rdd
rdd6 = spark.sparkContext.parallelize(data)

#Agregar columnas al DF desde un RDD
columns = ["lenguaje","usuarios"]

#Convertir de RDD a Dataframe+
df_from_rdd6 = rdd6.toDF(columns)

df_from_rdd6.printSchema()

In [0]:
df_from_data = spark.createDataFrame(data).toDF(*columns)
df_from_data.show()

In [0]:
#Agregar columnas al DF desde un RDD
columns = ["lenguaje","usuarios"]
df_from_rdd6 = 

En caso se requiera especificar los nombres de las columnas, así como el tipo de datos, primero se debe crear el esquema StructType, y luego asignarlo mientras se crea el dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
data3 = [("Pedro","Alonso","Ibañez",44545311,"M",10000),
         ("Juan","Pedro","Salazar",44545312,"M",20000),
         ("Alberto","Iam","Godines",44545313,"M",30000),
         ("Maria","Miranda","Diaz",44545314,"F",40000),
         ("Raquel","Ali","Guzman",44545315,"F",50000)
        ]

In [0]:
schema = StructType([StructField("PrimerNombre",StringType(),True),
                     StructField("SegundoNombre",StringType(),True),
                     StructField("Apellido",StringType(),True),
                     StructField("DNI",IntegerType(),True),
                     StructField("Sexo",StringType(),True),
                     StructField("Salario",IntegerType(),True)
                    ])

In [0]:
#Crear el dataframe
df = spark.createDataFrame(data=data3,schema=schema)

#Imprimir el esquema
df.printSchema()

#Mostrar los datos del dataframe
df.show(truncate=False)


1. Desde un CSV (indistitamente donde este alojado) --> df_csv = spark.read.csv("path/file.csv")
2. Desde un TXT (indistitamente donde este alojado) --> df_text = spark.read.text("path/file.txt")
3. Desde un JSON (indistitamente donde este alojado) --> df_json = spark.read.json("path/file.json")
4. Otras fuentes (AVRO, PARQUET, KAFKA, Tablas Hive/Base, entre otros) --> spark.read.<tipo_archivo>("path/file.json")

In [0]:
#Importando las librerias necesarias
from pyspark.sql.types import *
#Ruta a cargar
path ="/FileStore/tables/ventas-1.txt"

#Definir un esquema
schema = StructType([StructField("NCLICOD",IntegerType(),True), \
                StructField("PRDDCOD",StringType(),True), \
                StructField("VTAPIMPVEN",DecimalType(10,2),True), \
                StructField("VTAPIGV",DecimalType(10,2),True), \
                StructField("VTAPMTOTOT",DecimalType(10,2),True), \
                StructField("VTAPFCH",StringType(),True) \
                    ])

df_with_schema = spark.read.format("csv").option("header","true").option("delimiter",",").schema(schema).load(path)

In [0]:
df_with_schema.show()

In [0]:
StructType y StructField son clases que se usa con pyspark, y van a permitir gestionar las estructuras de datos con el dataframe para que se pueda hacer las conversiones de las mismas, estructuras anidadas, matrices, etc

In [0]:
df_with_schema.select("NCLICOD","PRDDCOD").show()

In [0]:
#Opción 2
df_with_schema.select(df_with_schema.NCLICOD, df_with_schema.PRDDCOD).show()


In [0]:
#Opción 3

df_with_schema.select(df_with_schema["NCLICOD"],df_with_schema["PRDDCOD"])

In [0]:
#Opción 4: Usando la función COL()
from pyspark.sql.functions import col
df_with_schema.select(col("NCLICOD"),col("PRDDCOD")).show()

In [0]:
#Seleccionamos campos con nombres de columnas filtradas. Esto es similar a un like % en SQL
df_with_schema.select(df_with_schema.colRegex("`^.*COD*`")).show()

Es una función de transformacuón, que permite cambiar el valor, convertir el tipo de dato o crear una columna.

In [0]:
#Castero de variable

df_with_schema.withColumn("VTAPFCH",col("VTAPFCH").cast("String")).show()

In [0]:
#Actualizar el valor del campo
df_with_schema.withColumn("VTAPIMPVEN",col("VTAPIMPVEN")*1.2).show()

In [0]:
#Crear una nueva columna con lógica
df_with_schema.withColumn("DEDUCCIONES",col("VTAPIGV")*0.3).show()


In [0]:
#Renombar una columna
df_with_schema.withColumnRenamed("NCLICOD","COD_CLIENTE").show()

In [0]:
#Eliminar una columna
df_with_schema.drop("NombreColumna").show()

In [0]:
#Ejercicio:
#Desarrolle usted una reporte donde tenga la siguiente estructura:

#CODIGO_CLIENTE
#CODIGO_PRODUCTO
#VENTA_PARCIAL
#VENTA_IGV
#VENTA_TOTAL
#FECHA_VENTA
#DESCUENTO = 0.15*VENTA_TOTAL
#DESCUENTO_FINAL = DESCUENTO*0.10
#MONTO_CON_DESCUENTOS= VENTA_TOTAL-DESCUENTO-DESCUENTO_FINAL

#ORDERNADO DE MAYOR A MENOR MONTO_CON_DESCUENTOS

df_with_schema.withColumnRenamed("NCLICOD","CODIGO_CLIENTE")\
.withColumnRenamed("PRDDCOD","CODIGO_PRODUCTO")\
.withColumnRenamed("VTAPIMPVEN","VENTA_PARCIAL")\
.withColumnRenamed("VTAPIGV","VENTA_IGV")\
.withColumnRenamed("VTAPMTOTOT","VENTA_TOTAL")\
.withColumnRenamed("VTAPFCH","FECHA_VENTA")\
.withColumn("DESCUENTO",col("VENTA_TOTAL")*0.15)\
.withColumn("DESCUENTO_FINAL",col("DESCUENTO")*0.1)\
.withColumn("MONTO_CON_DESCUENTOS",col("VENTA_TOTAL")-col("DESCUENTO")-col("DESCUENTO_FINAL"))\
.orderBy(col("MONTO_CON_DESCUENTOS")\
.desc()).show()




In [0]:
df_with_schema.filter(df_with_schema.NCLICOD == 43793).show()

##### 1.	Cree un proceso, donde una vez que la data se deje en una ruta, este pueda cargarse de forma directa, con los tipos de datos que corresponden  a cada variable.

### Import Libraries

In [0]:
#Importando las librerias necesarias
from pyspark.sql.types import *
from pyspark.sql.functions import col, when


In [0]:

#Ruta a cargar
path ="/FileStore/tables/t2.csv"

#Definir un esquema
schema = StructType([StructField("",IntegerType(),True), \
                StructField("ID",IntegerType(),True), \
                StructField("NIVEL_EDUCATIVO",StringType(),True), \
                StructField("SEXO",StringType(),True), \
                StructField("CATEGORIA_EMPLEO",StringType(),True), \
                StructField("EXPERIENCIA_LABORAL",StringType(),True), \
                StructField("ESTADO_CIVIL",StringType(),True), \
                StructField("EDAD",IntegerType(),True), \
                StructField("UTILIZACION_TARJETAS",IntegerType(),True), \
                StructField("NUMERO_ENTIDADES",StringType(),True), \
                StructField("DEFAULT",IntegerType(),True), \
                StructField("TARGET",IntegerType(),True), \
                StructField("NUM_ENT_W",DecimalType(10,2),True), \
                StructField("EDUC_W",DecimalType(10,2),True), \
                StructField("EXP_LAB_W",DecimalType(10,2),True), \
                StructField("EDAD_W",DecimalType(10,2),True), \
                StructField("UTIL_TC_W",DecimalType(10,2),True), \
                StructField("PD",DoubleType(),True), \
                StructField("RPD",DoubleType(),True)
                    ])

#df_with_schema=spark.read.format("csv").option("header","true").option("delimiter",",").schema(schema).load(path)
df_with_schema = spark.read.option("header", "true").option("ignoreLeadingWhiteSpace","true").schema(schema).csv(path)
df_with_schema=df_with_schema.drop("")
df_with_schema.display()

#### 2.	Elimine todas las variables que contienen peso (terminan en _W) y liste el dataframe depurado

In [0]:
from pyspark.sql.types import *
temp = df_with_schema.select(df_with_schema.colRegex("`^.*_W`"))
df_with_schema = df_with_schema.drop(*temp.columns)
df_with_schema.display()

#### 3.	Liste un reporte donde muestre solo las personas con nivel educativo “Estudios Universitarios” con una “PD” más alta. 

In [0]:
from pyspark.sql.functions import desc, col
display(df_with_schema.filter(df_with_schema.NIVEL_EDUCATIVO=='Estudios Universitarios')\
        .select("ID","PD")\
        .orderBy(desc("PD")).head(5))

#### 4.	Tomando la pregunta 3 de referencia, indique solo los primeros 50. 

In [0]:
display(df_with_schema.filter(df_with_schema.NIVEL_EDUCATIVO=='Estudios Universitarios')\
        .select("ID","PD")\
        .orderBy(desc("PD")).head(50))

#### 5.	Genere un cuadro consolidado donde indique la cantidad de personas que tienen estudios técnicos y estudios universitarios.

In [0]:
df_level_education = df_with_schema.filter(col("NIVEL_EDUCATIVO").isin(["Estudios Tecnicos","Estudios Universitarios"]))\
                     .groupBy(col("NIVEL_EDUCATIVO")).count()
df_level_education.show()

#### 6.	De la pregunta anterior, cree un dataframe solo con los de estudios técnicos y sus PD. Listelas de mayor a menor por PD. Seleccione un nuevo dataframe donde solo muestre los 5 registros con mejores PD y los 5 peores.

In [0]:
#df_with_schema.display()
from pyspark.sql.functions import desc
df_filter_estTecnico=df_with_schema.select("ID","PD").filter(df_with_schema.NIVEL_EDUCATIVO=="Estudios Tecnicos").orderBy(desc("PD"))


df_filter_first=spark.createDataFrame(df_filter_estTecnico.head(5))
df_filter_ult=spark.createDataFrame(df_filter_estTecnico.tail(5))

df_union=df_filter_first.union(df_filter_ult)
df_union.show()

#### 7.	Similar a la pregunta 6, realice el mismo caso para los que tienen estudios universitarios.

In [0]:
df_filter_estUni=df_with_schema.select("ID","PD").filter(df_with_schema.NIVEL_EDUCATIVO=="Estudios Universitarios").orderBy(desc("PD"))


from pyspark.sql.functions import *

df_estUni_first5=df_filter_estUni.limit(5)
df_estUni_tail5=spark.createDataFrame(df_filter_estUni.tail(5))

df=df_estUni_first5.union(df_estUni_tail5)
df.show()

#### 8.	Tomando la muestra de la ¿ pregunta 6 y 7. Quiénes tienen menor probabilidad de default? (PD). Nota: Si el valor es menor, tiene menor probabilidad.

In [0]:
df_default = df_with_schema.select(col("DEFAULT"))
df_default.show()

In [0]:
df_default = df_default.withColumn("valor_default", when(col("DEFAULT") == "0", "Menor Probabilidad")
                                      .when(col("DEFAULT") == "1", "Mayor probabilidad")
                                      .otherwise(col("DEFAULT")))

#### 9.	Cree un Dataframe donde solo incluya los campos: ID, Nivel Educativo, Sexo, Edad, PD. Adicionalmente le solicitan estrezar el portafolio de clientes y su riesgo, por lo que a la variable Edad se le aplicará un factor en función del ID. 

In [0]:
df_stressed = df_with_schema.select("ID","Nivel_Educativo","Sexo","Edad","PD")
df_stressed.display()

In [0]:
df_stressed = df_stressed.withColumn("Factor",col("Edad")*col("ID")/10000)
df_stressed.display(100)

In [0]:
#SOLO ES ACLARACIÓN DE LA A

In [0]:

from pyspark.sql import functions as F
from pyspark.sql.functions import asc
df_stressed = df_stressed.withColumn("PDF", F.when(df_stressed.PD > 0.01, col("PD")).otherwise(col("PD")*col("Edad"))).orderBy(asc("PD"))
df_stressed.display(100)

In [0]:
df_stressed_comparative = df_stressed.select("PD","PDF")
df_stressed_comparative.display()

In [0]:
df_stressed_comparative=df_stressed_comparative.withColumn("Resta_pd_pdf",col("PD")-col("PDF"))
print(df_stressed_comparative.filter(df_stressed_comparative.Resta_pd_pdf==0).count()/ df_stressed_comparative.count())



##### EL PDF (PD calculado) difiere del PD solo en un 1% del total de registros existentes. Por lo que podemos indicar que no existe mucha diferencia.