In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
import pyspark

print(pyspark.__version__)

3.5.0


In [2]:
# Create a Spark session
spark = SparkSession.builder.appName("GroupCSVData").getOrCreate()


In [3]:
# Path to the CSV file
csv_file_path = "../data/events.csv.gz"

# Read the CSV file into a DataFrame
df_eventos = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df_eventos.show()


+----+-----+-------+---+--------+------+---------+--------------+
|hour|calls|seconds|sms|    date|region|id_source|id_destination|
+----+-----+-------+---+--------+------+---------+--------------+
|  11|    1|     24|  0|20211001|     5|      BF3|           374|
|   1|    1|     51|  0|20211001|     4|      9F5|           374|
|  11|    1|      3|  0|20211001|     6|      025|           374|
|  10|    1|     36|  0|20211001|     5|      FB6|           D52|
|  23|    4|    137|  0|20211001|     8|      4BB|           861|
|  18|    0|      0|  1|20211001|     4|      90C|           5B0|
|  13|    1|    618|  0|20211001|     9|      7AB|           4CA|
|  16|    1|    172|  0|20211001|     9|      7AB|           4CA|
|   6|    1|    208|  0|20211001|     9|      7AB|           4CA|
|   5|    1|     66|  0|20211001|     9|      7AB|           4CA|
|  18|    1|    135|  0|20211001|     9|      7AB|           4CA|
|  12|    0|      0|  8|20211001|     9|      0A4|           465|
|  21|    

In [4]:
# Mostrar el esquema del DataFrame
df_eventos.printSchema()

root
 |-- hour: integer (nullable = true)
 |-- calls: integer (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- sms: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- region: integer (nullable = true)
 |-- id_source: string (nullable = true)
 |-- id_destination: string (nullable = true)



In [5]:
df_eventos.count()

1000000

In [7]:
from pyspark.sql.functions import col, sum

nulos_por_columna = df_eventos.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_eventos.columns])
nulos_por_columna.show()


+----+-----+-------+---+----+------+---------+--------------+
|hour|calls|seconds|sms|date|region|id_source|id_destination|
+----+-----+-------+---+----+------+---------+--------------+
|   0|    0|      0|  0|   0|     0|       18|            15|
+----+-----+-------+---+----+------+---------+--------------+



In [8]:
# Eliminar registros con nulos en "id_source" y "id_destination"
df_eventos = df_eventos.dropna(subset=["id_source", "id_destination"])
df_eventos.count()


999970

In [9]:
# Path to the CSV file
csv_file_path = "../data/free_sms_destinations.csv.gz"

# Read the CSV file into a DataFrame
df_free_ids = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# Show the first few rows of the DataFrame
df_free_ids.show()

+---+
| id|
+---+
|374|
|D52|
|861|
|5B0|
|4CA|
|465|
|048|
|D5B|
|FD6|
|2D3|
|B3F|
|6AF|
|216|
|C3A|
|EE2|
|328|
|99E|
|BF6|
|60F|
|6AC|
+---+
only showing top 20 rows



In [27]:
# Left join en la columna "id_destination"
joined_df = df_eventos.join(df_free_ids, df_eventos["id_destination"] == df_free_ids["id"], how="left")
joined_df.show()


+----+-----+-------+---+--------+------+---------+--------------+----+
|hour|calls|seconds|sms|    date|region|id_source|id_destination|  id|
+----+-----+-------+---+--------+------+---------+--------------+----+
|   8|    1|    189|  0|20211001|     7|      395|           A7C|NULL|
|   9|    1|    636|  0|20211001|     7|      D61|           A7C|NULL|
|  19|    1|    751|  0|20211001|     7|      8C1|           A7C|NULL|
|  17|    1|   6386|  0|20211001|     7|      316|           A7C|NULL|
|  20|    1|   6504|  0|20211001|     7|      BDB|           A7C|NULL|
|  20|    1|   1972|  0|20211001|     7|      CC7|           A7C|NULL|
|  18|    1|     81|  0|20211001|     7|      3F6|           0CC|NULL|
|   9|    1|     18|  0|20211001|     7|      88F|           260|NULL|
|  17|    2|     51|  0|20211001|     5|      80A|           260|NULL|
|  10|    1|     87|  0|20211001|     7|      FFD|           260|NULL|
|  18|    1|     19|  0|20211001|     7|      DF4|           260|NULL|
|  20|

In [None]:
df_facturacion = joined_df.groupBy("id_source").agg(count("*").alias("count"))