<a href="https://colab.research.google.com/github/luisosmx/spark/blob/main/inegi_influx_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar -xzf spark-3.1.2-bin-hadoop3.2.tgz
!mv spark-3.1.2-bin-hadoop3.2.tgz /opt/spark-3.1.2
!pip install -q findspark
!pip install pyspark==3.1.2 
!ln -s /opt/spark-3.1.2 /opt/spark
!export SPARK_HOME=/opt/spark
!export PATH=$SPARK_HOME/bin:$PATH

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
ln: failed to create symbolic link '/opt/spark': File exists


In [2]:
!pip install pyspark==3.1.2 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
pip freeze

In [3]:
import os
import findspark

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.1.2-bin-hadoop3.2'
#os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

findspark.init()

sc = pyspark.SparkContext("local[*]")
spark = SparkSession(sc)

print('Modules imported and Spark loaded')

Modules imported and Spark loaded


# Loading data into PySpark

Getting data from Github repo without clonning the project, just using [raw.githubusercontent.com](https://stackoverflow.com/questions/39065921/what-do-raw-githubusercontent-com-urls-represent)

In [4]:
!wget --continue /content/afluenciastc_simple_01_2023.csv

/content/afluenciastc_simple_01_2023.csv: Scheme missing.


Reading the file with a Spark dataframe

In [5]:
spark_df = spark.read.csv('/content/afluenciastc_simple_01_2023.csv')
spark_df.show(10)



+----------+----+-----+-------+-------------------+---------+
|       _c0| _c1|  _c2|    _c3|                _c4|      _c5|
+----------+----+-----+-------+-------------------+---------+
|     fecha|anio|  mes|  linea|           estacion|afluencia|
|2010-01-01|2010|Enero|Linea 1|           Zaragoza|    20227|
|2010-01-01|2010|Enero|Linea 1| Isabel la Católica|     6487|
|2010-01-01|2010|Enero|Linea 1|          Moctezuma|    10304|
|2010-01-01|2010|Enero|Linea 1|        Pino Suárez|     8679|
|2010-01-01|2010|Enero|Linea 1|       Gómez Farías|    19499|
|2010-01-01|2010|Enero|Linea 6|Deptvo. 18 de Marzo|      621|
|2010-01-01|2010|Enero|Linea 6|  La Villa-Basilica|    24792|
|2010-01-01|2010|Enero|Linea 9|          Pantitlán|    27000|
|2010-01-01|2010|Enero|Linea 8|             Aculco|     3652|
+----------+----+-----+-------+-------------------+---------+
only showing top 10 rows



In [6]:
df = spark.read.option("header",True).csv('/content/afluenciastc_simple_01_2023.csv')
df.show(20)

+----------+----+-----+-------+-------------------+---------+
|     fecha|anio|  mes|  linea|           estacion|afluencia|
+----------+----+-----+-------+-------------------+---------+
|2010-01-01|2010|Enero|Linea 1|           Zaragoza|    20227|
|2010-01-01|2010|Enero|Linea 1| Isabel la Católica|     6487|
|2010-01-01|2010|Enero|Linea 1|          Moctezuma|    10304|
|2010-01-01|2010|Enero|Linea 1|        Pino Suárez|     8679|
|2010-01-01|2010|Enero|Linea 1|       Gómez Farías|    19499|
|2010-01-01|2010|Enero|Linea 6|Deptvo. 18 de Marzo|      621|
|2010-01-01|2010|Enero|Linea 6|  La Villa-Basilica|    24792|
|2010-01-01|2010|Enero|Linea 9|          Pantitlán|    27000|
|2010-01-01|2010|Enero|Linea 8|             Aculco|     3652|
|2010-01-01|2010|Enero|Linea 9|          Velódromo|     3239|
|2010-01-01|2010|Enero|Linea 5|Autobuses del Norte|    16824|
|2010-01-01|2010|Enero|Linea 5|          Misterios|     3513|
|2010-01-01|2010|Enero|Linea 7|     Constituyentes|     1417|
|2010-01

In [7]:
spark.read.option("header",True) \
          .csv("/content/afluenciastc_simple_01_2023.csv") \
          .createOrReplaceTempView("influx_file")

In [8]:
spark.sql("SELECT * FROM influx_file").show(10)

+----------+----+-----+-------+-------------------+---------+
|     fecha|anio|  mes|  linea|           estacion|afluencia|
+----------+----+-----+-------+-------------------+---------+
|2010-01-01|2010|Enero|Linea 1|           Zaragoza|    20227|
|2010-01-01|2010|Enero|Linea 1| Isabel la Católica|     6487|
|2010-01-01|2010|Enero|Linea 1|          Moctezuma|    10304|
|2010-01-01|2010|Enero|Linea 1|        Pino Suárez|     8679|
|2010-01-01|2010|Enero|Linea 1|       Gómez Farías|    19499|
|2010-01-01|2010|Enero|Linea 6|Deptvo. 18 de Marzo|      621|
|2010-01-01|2010|Enero|Linea 6|  La Villa-Basilica|    24792|
|2010-01-01|2010|Enero|Linea 9|          Pantitlán|    27000|
|2010-01-01|2010|Enero|Linea 8|             Aculco|     3652|
|2010-01-01|2010|Enero|Linea 9|          Velódromo|     3239|
+----------+----+-----+-------+-------------------+---------+
only showing top 10 rows



In [46]:
df_colum = spark.sql("""
  SELECT 
    fecha AS date
    , anio AS year
    , mes AS month
    , linea AS line
    , estacion AS station
    , afluencia AS influx
  FROM influx_file
""")

aqui se define el df como df_colum

In [47]:
df_colum.show(10)

+----------+----+-----+-------+-------------------+------+
|      date|year|month|   line|            station|influx|
+----------+----+-----+-------+-------------------+------+
|2010-01-01|2010|Enero|Linea 1|           Zaragoza| 20227|
|2010-01-01|2010|Enero|Linea 1| Isabel la Católica|  6487|
|2010-01-01|2010|Enero|Linea 1|          Moctezuma| 10304|
|2010-01-01|2010|Enero|Linea 1|        Pino Suárez|  8679|
|2010-01-01|2010|Enero|Linea 1|       Gómez Farías| 19499|
|2010-01-01|2010|Enero|Linea 6|Deptvo. 18 de Marzo|   621|
|2010-01-01|2010|Enero|Linea 6|  La Villa-Basilica| 24792|
|2010-01-01|2010|Enero|Linea 9|          Pantitlán| 27000|
|2010-01-01|2010|Enero|Linea 8|             Aculco|  3652|
|2010-01-01|2010|Enero|Linea 9|          Velódromo|  3239|
+----------+----+-----+-------+-------------------+------+
only showing top 10 rows



#Replace months by numbers

Aqui no estoy creando un df, aqui estoy generando una vista de un df

In [48]:
df_colum.createOrReplaceTempView("df_months")

In [49]:
df_months_filter = spark.sql("""
  SELECT
      CASE 
        WHEN month = 'Enero' THEN '01'
        WHEN month = 'Febrero' THEN '02'
        WHEN month = 'Marzo' THEN '03'
        WHEN month = 'Abril' THEN '04'
        WHEN month = 'Mayo' THEN '05'
        WHEN month = 'Junio' THEN '06'
        WHEN month = 'Julio' THEN '07'
        WHEN month = 'Agosto' THEN '08'
        WHEN month = 'Septiembre' THEN '09'
        WHEN month = 'Octubre' THEN '10'
        WHEN month = 'Noviembre' THEN '11'
        WHEN month = 'Diciembre' THEN '12'
        ELSE month
      END AS Month
  FROM df_months
""")

In [50]:
df_months_filter.show(10)

+-----+
|Month|
+-----+
|   01|
|   01|
|   01|
|   01|
|   01|
|   01|
|   01|
|   01|
|   01|
|   01|
+-----+
only showing top 10 rows



#Replace line by numbers

In [53]:
df_colum.createOrReplaceTempView("df_lines")

In [54]:
df_line_filter = spark.sql("""
  SELECT 
      CASE 
        WHEN line = 'Linea 1' THEN '1'
        WHEN line = 'Linea 2' THEN '2'
        WHEN line = 'Linea 3' THEN '3'
        WHEN line = 'Linea 4' THEN '4'
        WHEN line = 'Linea 5' THEN '5'
        WHEN line = 'Linea 6' THEN '6'
        WHEN line = 'Linea 7' THEN '7'
        WHEN line = 'Linea 8' THEN '8'
        WHEN line = 'Linea 9' THEN '9'
        WHEN line = 'Linea A' THEN 'A'
        WHEN line = 'Linea B' THEN 'B'
        WHEN line = 'Linea 12' THEN '12'
        ELSE line
      END AS Line
  FROM df_lines
""")

In [55]:
df_line_filter.show(10)

+----+
|Line|
+----+
|   1|
|   1|
|   1|
|   1|
|   1|
|   6|
|   6|
|   9|
|   8|
|   9|
+----+
only showing top 10 rows



#Sorting

In [59]:
df_colum.createOrReplaceTempView("df_station_accents")

In [60]:
df_station_filter = spark.sql("""
  SELECT 
    CASE 
      WHEN station LIKE '%á%' THEN REPLACE(station, 'á', 'a')
      WHEN station LIKE '%é%' THEN REPLACE(station, 'é', 'e')
      WHEN station LIKE '%í%' THEN REPLACE(station, 'í', 'i')
      WHEN station LIKE '%ó%' THEN REPLACE(station, 'ó', 'o')
      WHEN station LIKE '%ú%' THEN REPLACE(station, 'ú', 'u')
      WHEN station LIKE '%Á%' THEN REPLACE(station, 'Á', 'A')
      WHEN station LIKE '%É%' THEN REPLACE(station, 'É', 'E')
      WHEN station LIKE '%Í%' THEN REPLACE(station, 'Í', 'I')
      WHEN station LIKE '%Ó%' THEN REPLACE(station, 'Ó', 'O')
      WHEN station LIKE '%Ú%' THEN REPLACE(station, 'Ú', 'U')
      ELSE station
    END AS Station
  FROM df_station_accents
""")

In [62]:
df_station_filter.show(20)

+-------------------+
|            Station|
+-------------------+
|           Zaragoza|
| Isabel la Catolica|
|          Moctezuma|
|        Pino Suarez|
|       Gómez Farias|
|Deptvo. 18 de Marzo|
|  La Villa-Basilica|
|          Pantitlan|
|             Aculco|
|          Velodromo|
|Autobuses del Norte|
|          Misterios|
|     Constituyentes|
|          Refineria|
|            Etiopia|
|            Polanco|
|    Canal del Norte|
|          Bondojito|
|        Santa Anita|
|            Popotla|
+-------------------+
only showing top 20 rows

