# Pyspark with Large Txt Files

## Libreries

In [64]:
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as pyspfunct

## Creamos la sesion de spark

In [7]:
sc = SparkSession.builder.appName("PysparkLargeTxt").config ("spark.sql.shuffle.partitions", "50").config("spark.driver.maxResultSize","5g").config ("spark.sql.execution.arrow.enabled", "true").getOrCreate()

## Cargamos los archivos de texto

In [151]:
df_boletero_1 = sc.read.text('C:/Users/WIN/Pyspark_Codes/1_input/ibm1-834953.lst')
df_boletero_2 = sc.read.text('C:/Users/WIN/Pyspark_Codes/1_input/ibm2-834953.lst')

## Verificamos las columnas que contiene cada dataset

In [152]:
df_boletero_1.printSchema()

root
 |-- value: string (nullable = true)



In [153]:
df_boletero_2.printSchema()

root
 |-- value: string (nullable = true)



## Verificamos la cantidad de registros por cada dataset

In [147]:
df_boletero_2.count()

20000

In [148]:
df_boletero_2.count()

60196

## Vemos los datos que contiene cada dataset

In [156]:
df_boletero_1.show(5)

+--------------------+
|               value|
+--------------------+
|12861368HUAMAN RO...|
|10002867OGOSI ATU...|
|10002866OGOSI CON...|
|10002865RUIZ ROMA...|
|10002864CIPRIANO ...|
+--------------------+
only showing top 5 rows



In [155]:
df_boletero_2.show(5)

+--------------------+
|               value|
+--------------------+
|10600738INCA ISUI...|
|10834151ZAMALLOA ...|
|13001679CABRERA R...|
|13001677CABRERA R...|
|13001676CABRERA R...|
+--------------------+
only showing top 5 rows



## Separamos la información necesaria de la gran columna "value"

In [158]:
df_boletero_2 = df_boletero_2.withColumn('suministro', pyspfunct.substring(df_boletero_2.value, 2, 7))\
                             .withColumn('repartodigital', pyspfunct.substring(df_boletero_2.value, -3507, 1))

In [157]:
df_boletero_1 = df_boletero_1.withColumn('suministro', pyspfunct.substring(df_boletero_1.value, 2, 7))\
                             .withColumn('repartodigital', pyspfunct.substring(df_boletero_1.value, -3507, 1))

In [160]:
df_boletero_1.show(5)

+--------------------+----------+--------------+
|               value|suministro|repartodigital|
+--------------------+----------+--------------+
|12861368HUAMAN RO...|   2861368|             N|
|10002867OGOSI ATU...|   0002867|             N|
|10002866OGOSI CON...|   0002866|             N|
|10002865RUIZ ROMA...|   0002865|             N|
|10002864CIPRIANO ...|   0002864|             N|
+--------------------+----------+--------------+
only showing top 5 rows



In [161]:
df_boletero_2.show(5)

+--------------------+----------+--------------+
|               value|suministro|repartodigital|
+--------------------+----------+--------------+
|10600738INCA ISUI...|   0600738|             N|
|10834151ZAMALLOA ...|   0834151|             N|
|13001679CABRERA R...|   3001679|             N|
|13001677CABRERA R...|   3001677|             N|
|13001676CABRERA R...|   3001676|             N|
+--------------------+----------+--------------+
only showing top 5 rows



## Unimos las dos dataframe en uno e liminamos la columna "value"

In [165]:
df_boletero = df_boletero_1.union(df_boletero_2)
df_boletero = df_boletero.drop('value')

In [166]:
df_boletero.count()

80196

In [168]:
df_boletero.show(5)

+----------+--------------+
|suministro|repartodigital|
+----------+--------------+
|   2861368|             N|
|   0002867|             N|
|   0002866|             N|
|   0002865|             N|
|   0002864|             N|
+----------+--------------+
only showing top 5 rows



In [169]:
df_boletero.printSchema()

root
 |-- suministro: string (nullable = true)
 |-- repartodigital: string (nullable = true)



## Cambiamos de tipo de dato de la columna suministro a integer

In [170]:
df_boletero = df_boletero.withColumn('suministro',df_boletero.suministro.cast('int'))

In [171]:
df_boletero.printSchema()

root
 |-- suministro: integer (nullable = true)
 |-- repartodigital: string (nullable = true)



In [173]:
df_boletero.show(5)

+----------+--------------+
|suministro|repartodigital|
+----------+--------------+
|   2861368|             N|
|      2867|             N|
|      2866|             N|
|      2865|             N|
|      2864|             N|
+----------+--------------+
only showing top 5 rows



## Query al dataset

In [175]:
df_boletero.groupBy("repartodigital").count().show()

+--------------+-----+
|repartodigital|count|
+--------------+-----+
|             N|80168|
|              |    1|
|             S|   27|
+--------------+-----+



In [176]:
df_boletero.filter("repartodigital == 'N'").show(5)

+----------+--------------+
|suministro|repartodigital|
+----------+--------------+
|   2861368|             N|
|      2867|             N|
|      2866|             N|
|      2865|             N|
|      2864|             N|
+----------+--------------+
only showing top 5 rows



In [177]:
df_boletero.filter("repartodigital == 'S'").show(5)

+----------+--------------+
|suministro|repartodigital|
+----------+--------------+
|   1331193|             S|
|   2349333|             S|
|   2269688|             S|
|    893463|             S|
|   1765877|             S|
+----------+--------------+
only showing top 5 rows



In [179]:
df_boletero.filter("repartodigital == ' '").show(5)

+----------+--------------+
|suministro|repartodigital|
+----------+--------------+
|   2697954|              |
+----------+--------------+



## Reemplazando el valor nulo por un valor correcto

In [181]:
df_boletero = df_boletero.withColumn("repartodigital", pyspfunct.when(df_boletero.suministro == 2697954,'N')\
                                     .otherwise(df_boletero.repartodigital))

In [182]:
df_boletero.groupBy("repartodigital").count().show()

+--------------+-----+
|repartodigital|count|
+--------------+-----+
|             N|80169|
|             S|   27|
+--------------+-----+

