In [1]:
from pyspark.sql import SparkSession, Row

from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType

In [2]:
# Configuración de plataforma Spark
spark = SparkSession\
    .builder\
    .appName("PythonPi")\
    .getOrCreate()

In [3]:
# Importación de datos de Housing a una estructura Spark
path_archivo = r'C:\Users\Bodok\Downloads\EBAC curso\Modulo 50 - Big Data II/vgsales.csv'

df = spark.read.csv(path_archivo, header = True)

In [4]:
# Generación de Comandos SQL usando spark.sql
df.createOrReplaceGlobalTempView("VGSALES")
sql_str = "select Publisher, round(sum(NA_Sales),2) as TotalNA, round(sum(Global_Sales),2) as TotalGlobal from global_temp.VGSALES group by Publisher order by 3 desc"
spark.sql(sql_str).show()

+--------------------+-------+-----------+
|           Publisher|TotalNA|TotalGlobal|
+--------------------+-------+-----------+
|            Nintendo| 816.87|    1786.56|
|     Electronic Arts| 595.07|    1110.32|
|          Activision|  429.7|     727.46|
|Sony Computer Ent...| 265.22|      607.5|
|             Ubisoft| 253.43|     474.72|
|Take-Two Interactive| 220.49|     399.54|
|                 THQ| 208.77|     340.77|
|Konami Digital En...|  92.16|     283.64|
|                Sega|  109.4|     272.99|
|  Namco Bandai Games|  69.52|     254.09|
|Microsoft Game St...| 155.35|     245.79|
|              Capcom|  78.59|     200.89|
|               Atari| 110.04|     157.22|
|Warner Bros. Inte...|  81.18|     153.89|
|         Square Enix|  48.65|     145.18|
|Disney Interactiv...|  71.34|     119.96|
|   Eidos Interactive|  49.19|      98.98|
|           LucasArts|  49.97|      87.34|
|  Bethesda Softworks|  39.72|      82.14|
|        Midway Games|  45.46|      69.85|
+----------

In [5]:
# Muestra las primeras 5 filas, con un máximo de 40 caracteres por columna
spark.sql(sql_str).show(5,40)

+---------------------------+-------+-----------+
|                  Publisher|TotalNA|TotalGlobal|
+---------------------------+-------+-----------+
|                   Nintendo| 816.87|    1786.56|
|            Electronic Arts| 595.07|    1110.32|
|                 Activision|  429.7|     727.46|
|Sony Computer Entertainment| 265.22|      607.5|
|                    Ubisoft| 253.43|     474.72|
+---------------------------+-------+-----------+
only showing top 5 rows



In [6]:
# Muestra los datos verticalmente, como tarjetas
spark.sql(sql_str).show(5,40,True)

-RECORD 0----------------------------------
 Publisher   | Nintendo                    
 TotalNA     | 816.87                      
 TotalGlobal | 1786.56                     
-RECORD 1----------------------------------
 Publisher   | Electronic Arts             
 TotalNA     | 595.07                      
 TotalGlobal | 1110.32                     
-RECORD 2----------------------------------
 Publisher   | Activision                  
 TotalNA     | 429.7                       
 TotalGlobal | 727.46                      
-RECORD 3----------------------------------
 Publisher   | Sony Computer Entertainment 
 TotalNA     | 265.22                      
 TotalGlobal | 607.5                       
-RECORD 4----------------------------------
 Publisher   | Ubisoft                     
 TotalNA     | 253.43                      
 TotalGlobal | 474.72                      
only showing top 5 rows



In [7]:
# Plan de ejecución con explain
spark.sql(sql_str).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [TotalGlobal#133 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(TotalGlobal#133 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=175]
      +- HashAggregate(keys=[Publisher#22], functions=[sum(cast(NA_Sales#23 as double)), sum(cast(Global_Sales#27 as double))])
         +- Exchange hashpartitioning(Publisher#22, 200), ENSURE_REQUIREMENTS, [plan_id=172]
            +- HashAggregate(keys=[Publisher#22], functions=[partial_sum(cast(NA_Sales#23 as double)), partial_sum(cast(Global_Sales#27 as double))])
               +- FileScan csv [Publisher#22,NA_Sales#23,Global_Sales#27] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/C:/Users/Bodok/Downloads/EBAC curso/Modulo 50 - Big Data II/vgsa..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Publisher:string,NA_Sales:string,Global_Sales:string>




In [8]:
# Listado de columnas
spark.sql(sql_str).columns

['Publisher', 'TotalNA', 'TotalGlobal']

In [9]:
# Cuenta los registros
spark.sql(sql_str).count()

579

## Partición de datos

In [10]:
# Implementación de PartitionBy
spark = SparkSession.builder.appName('Partitionby() PySpark').getOrCreate()

# Lee en el dataframe el archivo vgsales
dataframe = spark.read.option("header",True).csv( r'C:\Users\Bodok\Downloads\partition/vgsales.csv')

# Imprime el esquema del archivo
dataframe.printSchema()

root
 |-- Rank: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Platform: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Publisher: string (nullable = true)
 |-- NA_Sales: string (nullable = true)
 |-- EU_Sales: string (nullable = true)
 |-- JP_Sales: string (nullable = true)
 |-- Other_Sales: string (nullable = true)
 |-- Global_Sales: string (nullable = true)



In [None]:
#Usando partitionBy con la columna Platform generará un directorio con cada plataforma
dataframe.write.option("header", True).partitionBy("Platform").mode("overwrite").csv("C:/Users/Bodok/Downloads/partition/platform")
#r'C:\Users\Bodok\Downloads\EBAC curso\Modulo 50 - Big Data II\partition\platform')

In [18]:
print(spark.version)

3.5.2


## PyArrow

In [19]:
from pyarrow import csv
import pyarrow as pa

In [20]:
archivo = r'C:\Users\Bodok\Downloads\partition/vgsales.csv'
tab_vgsales = csv.read_csv(archivo)

In [21]:
# Estructura de vgsales visto desde PyArrow
tab_vgsales

pyarrow.Table
Rank: int64
Name: string
Platform: string
Year: int64
Genre: string
Publisher: string
NA_Sales: double
EU_Sales: double
JP_Sales: double
Other_Sales: double
Global_Sales: double
----
Rank: [[1,2,3,4,5,...,12815,12816,12817,12818,12819],[12820,12821,12822,12823,12824,...,16596,16597,16598,16599,16600]]
Name: [["Wii Sports","Super Mario Bros.","Mario Kart Wii","Wii Sports Resort","Pokemon Red/Pokemon Blue",...,"SeaBlade","Jikkyou Powerful Pro Yakyuu DreamCast Edition","Carmen Sandiego: The Secret of the Stolen Drums","Tokyo Mono Harashi: Karasu no Mori Gakuen Kitan","Activision Hits: Remixed"],["Death Jr. II: Root of Evil","Hail to the Chimp","MotoGP 4 - Official Game of MotoGP","Legend of Kay","Kyoukai Senjou no Horizon Portable",...,"Woody Woodpecker in Crazy Castle 5","Men in Black II: Alien Escape","SCORE International Baja 1000: The Official Game","Know How 2","Spirits & Spells"]]
Platform: [["Wii","NES","Wii","Wii","GB",...,"XB","DC","PS2","PSP","PSP"],["PSP","X360","

In [22]:
# Largo de tab_vgsales
len(tab_vgsales)

16598

In [23]:
# Lista las columnas
tab_vgsales.column_names

['Rank',
 'Name',
 'Platform',
 'Year',
 'Genre',
 'Publisher',
 'NA_Sales',
 'EU_Sales',
 'JP_Sales',
 'Other_Sales',
 'Global_Sales']

In [24]:
# Publica esquema
tab_vgsales.schema

Rank: int64
Name: string
Platform: string
Year: int64
Genre: string
Publisher: string
NA_Sales: double
EU_Sales: double
JP_Sales: double
Other_Sales: double
Global_Sales: double

In [25]:
# Más funciones para descripción del dataset
tab_vgsales.num_columns

11

In [26]:
# Más funciones para descripción del dataset
tab_vgsales.num_rows

16598

In [27]:
# Pasa la estrucutra a Pandas
df = tab_vgsales.to_pandas()

In [28]:
df.head()

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006.0,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985.0,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008.0,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009.0,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0
4,5,Pokemon Red/Pokemon Blue,GB,1996.0,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37


In [29]:
type(tab_vgsales)

pyarrow.lib.Table

In [30]:
# Agrupa las tablas por Género
tab_genre = tab_vgsales.group_by("Genre").aggregate([("NA_Sales", "sum")])

In [31]:
df1 = tab_genre.to_pandas()

In [32]:
df1.head()

Unnamed: 0,Genre,NA_Sales_sum
0,Sports,683.35
1,Platform,447.05
2,Racing,359.42
3,Role-Playing,327.28
4,Puzzle,123.78


In [33]:
# Añade una columna con append_column
tab2 = tab_vgsales.append_column("Test", pa.array(['0'] * len(tab_vgsales), pa.string()))
df2 = tab2.to_pandas()
df2.head()

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales,Test
0,1,Wii Sports,Wii,2006.0,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74,0
1,2,Super Mario Bros.,NES,1985.0,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24,0
2,3,Mario Kart Wii,Wii,2008.0,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82,0
3,4,Wii Sports Resort,Wii,2009.0,Sports,Nintendo,15.75,11.01,3.28,2.96,33.0,0
4,5,Pokemon Red/Pokemon Blue,GB,1996.0,Role-Playing,Nintendo,11.27,8.89,10.22,1.0,31.37,0


## Uso de Parquet

In [34]:
import pyarrow.parquet as pq
import pandas as pd
import pyarrow as pa

df = pd.DataFrame({'lin1': [-20, 100, 200],
                   'lin2': ['este', 'es un', 'ejemplo'],
                   'l3': [False, True, True]},
                   index = list('abc'))

In [35]:
df

Unnamed: 0,lin1,lin2,l3
a,-20,este,False
b,100,es un,True
c,200,ejemplo,True


In [36]:
tab_example = pa.Table.from_pandas(df)
pq.write_table(tab_example, r'C:\Users\Bodok\Downloads\EBAC curso\Modulo 50 - Big Data II\parquet/ejemplo.parquet')


In [37]:
tab_2 = pq.read_table(r'C:\Users\Bodok\Downloads\EBAC curso\Modulo 50 - Big Data II\parquet/ejemplo.parquet')
tab_2.to_pandas()

Unnamed: 0,lin1,lin2,l3
a,-20,este,False
b,100,es un,True
c,200,ejemplo,True


In [38]:
tab_2

pyarrow.Table
lin1: int64
lin2: string
l3: bool
__index_level_0__: string
----
lin1: [[-20,100,200]]
lin2: [["este","es un","ejemplo"]]
l3: [[false,true,true]]
__index_level_0__: [["a","b","c"]]