## Instalando las dependencias necesarias

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

#### Definiendo variables de entorno

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
!ls

sample_data  spark-3.1.1-bin-hadoop3.2	spark-3.1.1-bin-hadoop3.2.tgz


In [4]:
! pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=a00c610701a8b6e79af750aec457751cefd2ab95aa4d5b4eaa9c04c8ea0b8f6e
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Trabajar con datos

In [6]:
# Para obtener detalle sobre un función de pyspark
help(spark.read.text)

Help on method text in module pyspark.sql.readwriter:

text(paths, wholetext=False, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads text files and returns a :class:`DataFrame` whose schema starts with a
    string column named "value", and followed by partitioned columns if there
    are any.
    The text files must be encoded as UTF-8.
    
    By default, each line in the text file is a new row in the resulting DataFrame.
    
    .. versionadded:: 1.6.0
    
    Parameters
    ----------
    paths : str or list
        string, or list of strings, for input path(s).
    wholetext : str or bool, optional
        if true, read each file from input path(s) as a single row.
    lineSep : str, optional
        defines the line separator that should be used for parsing. If None is
        set, it covers all ``\r``, ``\r\n`` and ``\n``.
    pathGlobFilter : str or bool, 

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [10]:
# Ver el archivo por pantalla y no truncarlo
spark.read.text('/content/drive/MyDrive/retail_db/schemas.json', wholetext=True).show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [13]:
# Ver el primer registro del archivo por pantalla , ademas traerlo en una variable de tipo texto
schema_text = spark.read.text('/content/drive/MyDrive/retail_db/schemas.json', wholetext=True).first().value

schema_text

'{\r\n    "departments": [\r\n        {\r\n            "column_name": "department_id",\r\n            "data_type": "integer",\r\n            "column_position": 1\r\n        },\r\n        {\r\n            "column_name": "department_name",\r\n            "data_type": "string",\r\n            "column_position": 2\r\n        }\r\n    ],\r\n    "categories": [\r\n        {\r\n            "column_name": "category_id",\r\n            "data_type": "integer",\r\n            "column_position": 1\r\n        },\r\n        {\r\n            "column_name": "category_department_id",\r\n            "data_type": "integer",\r\n            "column_position": 2\r\n        },\r\n        {\r\n            "column_name": "category_name",\r\n            "data_type": "string",\r\n            "column_position": 3\r\n        }\r\n    ],\r\n    "orders": [\r\n        {\r\n            "column_name": "order_id",\r\n            "data_type": "integer",\r\n            "column_position": 1\r\n        },\r\n        {\r\n 

In [14]:
type(schema_text)

str

In [15]:
# Libreria de python usada para manejar los archivos json
import json

In [16]:
#  Lectura del json como un python dict
column_details = json.loads(schema_text)['orders']
column_details

[{'column_name': 'order_id', 'data_type': 'integer', 'column_position': 1},
 {'column_name': 'order_date', 'data_type': 'string', 'column_position': 2},
 {'column_name': 'order_customer_id',
  'data_type': 'timestamp',
  'column_position': 3},
 {'column_name': 'order_status', 'data_type': 'string', 'column_position': 4}]

In [17]:
# Se ordena los datos por la columns position
sorted(column_details, key=lambda col: col['column_position'])

[{'column_name': 'order_id', 'data_type': 'integer', 'column_position': 1},
 {'column_name': 'order_date', 'data_type': 'string', 'column_position': 2},
 {'column_name': 'order_customer_id',
  'data_type': 'timestamp',
  'column_position': 3},
 {'column_name': 'order_status', 'data_type': 'string', 'column_position': 4}]

In [18]:
# Extraer del diccionario del nombre de la columna
columns = [info['column_name'] for info in sorted(column_details, key=lambda col: col['column_position'])]

In [19]:
help(spark.read.csv)

Help on method csv in module pyspark.sql.readwriter:

csv(path, schema=None, sep=None, encoding=None, quote=None, escape=None, comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None, maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None, columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None, samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None, pathGlobFilter=None, recursiveFileLookup=None, modifiedBefore=None, modifiedAfter=None, unescapedQuoteHandling=None) method of pyspark.sql.readwriter.DataFrameReader instance
    Loads a CSV file and returns the result as a  :class:`DataFrame`.
    
    This function will go through the input once to determine the input schema if
    ``inferSchema`` is enabled. To avoid going through the entire data once, di

In [20]:
# Lectura csv, permitir que infiera el esquema de los datos -  asigna los nombres anteriores a las columns
orders = spark.read.csv('/content/drive/MyDrive/retail_db/orders', inferSchema=True).toDF(*columns)

In [21]:
from pyspark.sql.functions import count, col

In [22]:
# Aplicación de funciones de agregación en pyspark
orders.\
  groupBy('order_status').\
    agg(count('*').alias('order_count')).\
      orderBy(col('order_count').desc()).\
      show()

+---------------+-----------+
|   order_status|order_count|
+---------------+-----------+
|       COMPLETE|      22899|
|PENDING_PAYMENT|      15030|
|     PROCESSING|       8275|
|        PENDING|       7610|
|         CLOSED|       7556|
|        ON_HOLD|       3798|
|SUSPECTED_FRAUD|       1558|
|       CANCELED|       1428|
| PAYMENT_REVIEW|        729|
+---------------+-----------+



In [24]:
# Función para extraer el nombre de las columnas del json
def get_columns(schemas_file, table_name:str):
  schema_text = spark.read.text(schemas_file, wholetext=True).first().value
  column_details = json.loads(schema_text)[table_name]
  columns = [info['column_name'] for info in sorted(column_details, key=lambda col: col['column_position'])]
  return columns

In [25]:
get_columns('/content/drive/MyDrive/retail_db/schemas.json', 'orders')

['order_id', 'order_date', 'order_customer_id', 'order_status']

In [35]:
# Nombre de las tablas a cargar
tables_list = [
    'departments', 'categories', 'products',
    'customers', 'orders', 'order_items'
]

In [36]:
base_dir = '/content/drive/MyDrive/retail_db'

In [38]:
for t in tables_list:
  print(f'Processing {t} data')
  columns = get_columns(f'{base_dir}/schemas.json', t)

  # Lectura de la tabla y asignación de las columnas
  df = spark.read.csv(f'{base_dir}/{t}', inferSchema=True).toDF(*columns)

  # Escritura de la tabla en formato parquet
  df.write.mode('overwrite').parquet(f'/content/drive/MyDrive/retail_db/parquet_db/{t}')



Processing departments data
Processing categories data
Processing products data
Processing customers data
Processing orders data
Processing order_items data


In [41]:
! ls -r /content/drive/MyDrive/retail_db/parquet_db

products  orders  order_items  departments  customers  categories


# Using Spark Data Frame

In [44]:
# Leyendo csv, y definiendo el tipo de datos y el nombre de columnas con el comando schema
df = spark.read.csv(f'{base_dir}/orders', schema='order_id INT, order_date DATE, order_customer_id INT, order_status STRING')

In [45]:
# Seleccionar todas las columnas
df.select('*')

order_id,order_date,order_customer_id,order_status
1,2013-07-25,11599,CLOSED
2,2013-07-25,256,PENDING_PAYMENT
3,2013-07-25,12111,COMPLETE
4,2013-07-25,8827,CLOSED
5,2013-07-25,11318,COMPLETE
6,2013-07-25,7130,COMPLETE
7,2013-07-25,4530,COMPLETE
8,2013-07-25,2911,PROCESSING
9,2013-07-25,5657,PENDING_PAYMENT
10,2013-07-25,5648,PENDING_PAYMENT


In [46]:
# Seleccionar solo dos columnas
df.select('order_id', 'order_date')

order_id,order_date
1,2013-07-25
2,2013-07-25
3,2013-07-25
4,2013-07-25
5,2013-07-25
6,2013-07-25
7,2013-07-25
8,2013-07-25
9,2013-07-25
10,2013-07-25


In [49]:
# Seleccionar valores unicos
df.select('order_status').distinct()

order_status
PENDING_PAYMENT
COMPLETE
ON_HOLD
PAYMENT_REVIEW
PROCESSING
CLOSED
SUSPECTED_FRAUD
PENDING
CANCELED


In [50]:
# Seleccionar valores unicos y ordenar
df.select('order_status').distinct().orderBy('order_status')

order_status
CANCELED
CLOSED
COMPLETE
ON_HOLD
PAYMENT_REVIEW
PENDING
PENDING_PAYMENT
PROCESSING
SUSPECTED_FRAUD


In [53]:
# Contar el numero de filas
df.select('order_status').distinct().orderBy('order_status').count()

9

In [54]:
# Eliminar una columnas
df.drop('order_id')

order_date,order_customer_id,order_status
2013-07-25,11599,CLOSED
2013-07-25,256,PENDING_PAYMENT
2013-07-25,12111,COMPLETE
2013-07-25,8827,CLOSED
2013-07-25,11318,COMPLETE
2013-07-25,7130,COMPLETE
2013-07-25,4530,COMPLETE
2013-07-25,2911,PROCESSING
2013-07-25,5657,PENDING_PAYMENT
2013-07-25,5648,PENDING_PAYMENT


In [66]:
# usando apply para aplicar funciones sobre cada fila
from pyspark.sql.functions import date_format, col
from pyspark.sql.types import IntegerType,BooleanType,DateType

In [72]:
# usando funciones de spark sobre columnas
df.\
  select('order_id',
         'order_date',
         date_format(col('order_date'),'yyyyMM').cast('int').\
         alias('order_mount'))

order_id,order_date,order_mount
1,2013-07-25,201307
2,2013-07-25,201307
3,2013-07-25,201307
4,2013-07-25,201307
5,2013-07-25,201307
6,2013-07-25,201307
7,2013-07-25,201307
8,2013-07-25,201307
9,2013-07-25,201307
10,2013-07-25,201307


In [73]:
df.withColumn('order_date',  date_format(col('order_date'),'yyyyMM').cast('int'))

order_id,order_date,order_customer_id,order_status
1,201307,11599,CLOSED
2,201307,256,PENDING_PAYMENT
3,201307,12111,COMPLETE
4,201307,8827,CLOSED
5,201307,11318,COMPLETE
6,201307,7130,COMPLETE
7,201307,4530,COMPLETE
8,201307,2911,PROCESSING
9,201307,5657,PENDING_PAYMENT
10,201307,5648,PENDING_PAYMENT
