# Transformación a Parquet

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
#import glob

In [2]:
#Crear sesión de spark
spk = SparkSession.builder \
    .config("spark.driver.port", "4040") \
    .config("spark.local.dir", "/run/media/hugo/06368a0d-700b-4ac5-9159-173c295dcaed/") \
    .config("spark.hadoop.fs.defaultFS", "file:///run/media/hugo/06368a0d-700b-4ac5-9159-173c295dcaed/") \
    .master("local[*]") \
    .appName("EDAReview") \
    .getOrCreate()
spk.sparkContext.setLogLevel("ERROR")


24/11/05 14:09:56 WARN Utils: Your hostname, hugo-81we resolves to a loopback address: 127.0.1.1; using 10.201.213.244 instead (on interface wlp0s20f3)
24/11/05 14:09:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/05 14:09:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/05 14:09:56 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
24/11/05 14:09:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Carga de datos

In [5]:
files = glob.glob('./reviews-estados/**/*.json')

In [6]:
df = spk.read.json(files)

                                                                                

In [3]:
#Leer y cargar los dataset en un dataframe spark
df = spk.read.parquet('file:///run/media/hugo/06368a0d-700b-4ac5-9159-173c295dcaed/Google/review')

                                                                                

In [4]:
#Imprimir esquema de datos
df.printSchema()

root
 |-- gmap_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- pics: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- url: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- rating: long (nullable = true)
 |-- resp: struct (nullable = true)
 |    |-- text: string (nullable = true)
 |    |-- time: long (nullable = true)
 |-- text: string (nullable = true)
 |-- time: long (nullable = true)
 |-- user_id: string (nullable = true)



In [5]:
#Contar cantidad de filas del df
df.count()

                                                                                

89946359

In [6]:
# Filtrar no Nulos de 'pics' para ver contenido
df.filter(df['pics'].isNotNull()).show(5)

                                                                                

+--------------------+--------------+--------------------+------+----+--------------------+-------------+--------------------+
|             gmap_id|          name|                pics|rating|resp|                text|         time|             user_id|
+--------------------+--------------+--------------------+------+----+--------------------+-------------+--------------------+
|0x8752f569a04d64b...|     Mc Family|[{[https://lh5.go...|     5|NULL|I owe the success...|1614273754022|11356053341921870...|
|0x87528440b7ee298...|Sherri Narvaez|[{[https://lh5.go...|     1|NULL|See post from Mid...|1493078507343|10134158574946813...|
|0x874d907b753aa79...| Sarah M. Maya|[{[https://lh5.go...|     5|NULL|Was extremely gra...|1482624188733|10897238372292850...|
|0x87528a24ee73e5b...|     Tania Nay|[{[https://lh5.go...|     5|NULL|                NULL|1527482637204|11193326059038268...|
|0x80ca6ccb4e4a34f...|  Jacob Colvin|[{[https://lh5.go...|     5|NULL|Great building to...|1550358870272|116455

In [5]:
# Solo contiene las urls de los locales, se eliminará esta columna
df = df.drop('pics')
df.show(5)

                                                                                

+--------------------+----------------+------+--------------------+--------------------+-------------+--------------------+
|             gmap_id|            name|rating|                resp|                text|         time|             user_id|
+--------------------+----------------+------+--------------------+--------------------+-------------+--------------------+
|0x87528767d0ec0e4...|      Liz W Poch|     5|                NULL|Paige is the best...|1627085008811|11118259507767436...|
|0x87528767d0ec0e4...|Mario D'Ambrosio|     1|                NULL|I have updated my...|1626437133578|11768917084644824...|
|0x87528767d0ec0e4...| Kathleen Hambly|     5|                NULL|Simone is the bes...|1585165423975|11583468565499948...|
|0x87528767d0ec0e4...|   Crystal Olsen|     1|                NULL|Ive called severa...|1591629458998|11267961121373171...|
|0x87528767d0ec0e4...|    Sarah Jensen|     5|{Thank you for yo...|My fiance and I h...|1510264647735|10859942935026242...|
+-------

In [6]:
from pyspark.sql import functions as F

# Convertir la columna de milisegundos a fecha
df = df.withColumn('fecha', F.from_unixtime(F.col('time') / 1000).cast('timestamp'))

# Mostrar el DataFrame actualizado
df.show(5)

+--------------------+----------------+------+--------------------+--------------------+-------------+--------------------+-------------------+
|             gmap_id|            name|rating|                resp|                text|         time|             user_id|              fecha|
+--------------------+----------------+------+--------------------+--------------------+-------------+--------------------+-------------------+
|0x87528767d0ec0e4...|      Liz W Poch|     5|                NULL|Paige is the best...|1627085008811|11118259507767436...|2021-07-23 21:03:28|
|0x87528767d0ec0e4...|Mario D'Ambrosio|     1|                NULL|I have updated my...|1626437133578|11768917084644824...|2021-07-16 09:05:33|
|0x87528767d0ec0e4...| Kathleen Hambly|     5|                NULL|Simone is the bes...|1585165423975|11583468565499948...|2020-03-25 16:43:43|
|0x87528767d0ec0e4...|   Crystal Olsen|     1|                NULL|Ive called severa...|1591629458998|11267961121373171...|2020-06-08 12

In [7]:
#Convertir a minúsculas la columna de texto de review para facilitar la búsqueda
df = df.withColumn('text', F.lower(F.col('text')))

In [8]:
#Crear una columna de año para poder agrupar después
df = df.withColumn('año', F.year(F.col('fecha')))
df.show(5)

+--------------------+----------------+------+--------------------+--------------------+-------------+--------------------+-------------------+----+
|             gmap_id|            name|rating|                resp|                text|         time|             user_id|              fecha| año|
+--------------------+----------------+------+--------------------+--------------------+-------------+--------------------+-------------------+----+
|0x87528767d0ec0e4...|      Liz W Poch|     5|                NULL|paige is the best...|1627085008811|11118259507767436...|2021-07-23 21:03:28|2021|
|0x87528767d0ec0e4...|Mario D'Ambrosio|     1|                NULL|i have updated my...|1626437133578|11768917084644824...|2021-07-16 09:05:33|2021|
|0x87528767d0ec0e4...| Kathleen Hambly|     5|                NULL|simone is the bes...|1585165423975|11583468565499948...|2020-03-25 16:43:43|2020|
|0x87528767d0ec0e4...|   Crystal Olsen|     1|                NULL|ive called severa...|1591629458998|1126

In [11]:
df.filter(df['user_id'].isNull()).count()

                                                                                

0

In [12]:
#Comprobar nulos en la columna de fecha
df.filter(df['time'].isNull()).count()

                                                                                

0

In [13]:
#Comprobar nulos en la columna de texto de review
df.filter(df['text'].isNull()).count()

                                                                                

39307744

In [9]:
df = df.fillna({'text': 'nd'})

In [10]:
#Crear columna id_rev combinando los datos de user_id y time
df = df.withColumn('id_rev', F.concat(F.col('user_id'),F.lit('_'), F.col('time')))


In [16]:
#Creando df de los datos agrupados por id_rev que aparecen más de una vez
df_duplicados = df.groupBy('id_rev').count().filter(F.col('count') > 1)


In [17]:
# Obtener una muestra de 5 datos de la columna 'id_rev' del df de duplicados
muestras = df_duplicados.select("id_rev").rdd.map(lambda row: row[0]).takeSample(False, 5)

                                                                                

In [18]:
# Filtrar el DataFrame para controlar los datos duplicados y comprobar que sean duplicados
df_muestras = df.filter(df['id_rev'].isin(muestras))

# Mostrar el DataFrame filtrado
df_muestras.show()



+--------------------+-------------+------+----+--------------------+-------------+--------------------+-------------------+----+--------------------+
|             gmap_id|         name|rating|resp|                text|         time|             user_id|              fecha| año|              id_rev|
+--------------------+-------------+------+----+--------------------+-------------+--------------------+-------------------+----+--------------------+
|0x8814ca00028f51d...|     Hai Tran|     4|NULL|               goods|1502148317389|11543514572841014...|2017-08-07 20:25:17|2017|11543514572841014...|
|0x8814ca00028f51d...|     Hai Tran|     4|NULL|               goods|1502148317389|11543514572841014...|2017-08-07 20:25:17|2017|11543514572841014...|
|0x87cd266c656b682...|Charles Stitz|     5|NULL|everything is gre...|1562273141132|10777154252087485...|2019-07-04 17:45:41|2019|10777154252087485...|
|0x87cd266c656b682...|Charles Stitz|     5|NULL|everything is gre...|1562273141132|10777154252

                                                                                

In [11]:
#Eliminar Duplicados
df = df.dropDuplicates(["id_rev"])

In [20]:
#Escribir nuevos archivos parquet con datos almacenados
df_muestras = None
df_duplicados = None

In [23]:
# Dividir el DataFrame en 3 partes (ajusta las proporciones según lo necesites)
df_parts = df.randomSplit([0.3, 0.3, 0.4], seed=42)

In [24]:
# Guardar cada parte en un archivo Parquet separado
for i, part in enumerate(df_parts):
    part.write.parquet(f'reviewETL/part_{i}.parquet')




Py4JJavaError: An error occurred while calling o132.parquet.
: java.net.ConnectException: Call From hugo-81we/127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException: Conexión rehusada; For more details see:  http://wiki.apache.org/hadoop/ConnectionRefused
	at java.base/jdk.internal.reflect.DirectConstructorHandleAccessor.newInstance(DirectConstructorHandleAccessor.java:62)
	at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Constructor.java:502)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:486)
	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:913)
	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:828)
	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1616)
	at org.apache.hadoop.ipc.Client.call(Client.java:1558)
	at org.apache.hadoop.ipc.Client.call(Client.java:1455)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:242)
	at org.apache.hadoop.ipc.ProtobufRpcEngine2$Invoker.invoke(ProtobufRpcEngine2.java:129)
	at jdk.proxy2/jdk.proxy2.$Proxy46.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:965)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at jdk.proxy2/jdk.proxy2.$Proxy47.getFileInfo(Unknown Source)
	at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1739)
	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1753)
	at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1750)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1765)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1760)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:120)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:869)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:391)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:364)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:802)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1570)
Caused by: java.net.ConnectException: Conexión rehusada
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:682)
	at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:1060)
	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:205)
	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:586)
	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:711)
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:833)
	at org.apache.hadoop.ipc.Client$Connection.access$3800(Client.java:414)
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1677)
	at org.apache.hadoop.ipc.Client.call(Client.java:1502)
	... 62 more


In [12]:
df.write.option("compression", "snappy").parquet('reviewETL')

                                                                                