# Importación de Datos en Hive utilizando PySparlk




Importación de datos

In [1]:
!wget -q https://dlcdn.apache.org/spark/spark-4.1.1/spark-4.1.1-bin-hadoop3.tgz
!tar xf spark-4.1.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-4.1.1-bin-hadoop3"
import findspark
findspark.init()



Leer un fichero con spark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Learning_Spark") \
    .getOrCreate()

sc = spark.sparkContext

Leer multitud de ficheros con Spark

In [3]:
input = sc.textFile("spark-4.1.1-bin-hadoop3/*.md")
input

spark-4.1.1-bin-hadoop3/*.md MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

Leer JSON

In [4]:
!wget https://raw.githubusercontent.com/hassanarteaga/CosmosDBJSONExamples/master/Example1.json

--2026-01-15 11:21:54--  https://raw.githubusercontent.com/hassanarteaga/CosmosDBJSONExamples/master/Example1.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2427 (2.4K) [text/plain]
Saving to: ‘Example1.json’


2026-01-15 11:21:54 (31.3 MB/s) - ‘Example1.json’ saved [2427/2427]



In [5]:
jsonDF = spark.read.option("multiline","true").json('Example1.json')
jsonDF.show()


+--------------------+--------+-------------+--------------------+--------------------+---------+--------------------+
|          activities|end_date|event_country|          event_name|                  id|init_date|             speaker|
+--------------------+--------+-------------+--------------------+--------------------+---------+--------------------+
|[{30, Using Cosmo...|20180615|       MEXICO|       CosmosDB Conf|6e3bed84-bd7e-4c6...| 20180613|Hassan Arteaga Ro...|
|[{30, In Vue we t...|20180503|          USA|    Vue.js Conf 2018|204831ad-d190-49f...| 20180501|Hassan Arteaga Ro...|
|[{30, CosmosDB in...|20180625|      GERMANY|           IoT Space|a4b0e429-3455-439...| 20180621|Lourdes M. Guada ...|
|[{30, CosmosDB Pa...|20180615|       MEXICO|       CosmosDB Conf|70fc0d83-768e-42b...| 20180613|Lourdes M. Guada ...|
|[{30, CosmosDB a ...|20180412|       MEXICO|Cloud Conference ...|f3c965b4-39e8-4e5...| 20180411|Zamir Alejandro A...|
+--------------------+--------+-------------+---

In [6]:
jsonDF.printSchema()

root
 |-- activities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- duration: long (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- end_date: long (nullable = true)
 |-- event_country: string (nullable = true)
 |-- event_name: string (nullable = true)
 |-- id: string (nullable = true)
 |-- init_date: long (nullable = true)
 |-- speaker: string (nullable = true)



# Gestionando ficheros de texto

Utilizar USDA_activity_dataset_csv

https://github.com/euwern/human-activity-dataset

**Carga de Datos en Colab**

In [7]:
!wget https://raw.githubusercontent.com/curso-iabd-uclm/hadoop/main/pig/casualties/casualties_rates.csv

--2026-01-15 11:22:10--  https://raw.githubusercontent.com/curso-iabd-uclm/hadoop/main/pig/casualties/casualties_rates.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2523392 (2.4M) [text/plain]
Saving to: ‘casualties_rates.csv’


2026-01-15 11:22:11 (54.4 MB/s) - ‘casualties_rates.csv’ saved [2523392/2523392]



**Leer el CSV en un Dataframe filtrarlo y salvarlo**

In [8]:
data = spark.read.csv('casualties_rates.csv',sep = ";", inferSchema=True, header=True)

Filtrar por diferentes columnas


In [9]:
dataF=data.select("Country","Year","casualties_no","casualties100k_pop")

Guardar solo los datos filtrados

In [10]:
dataF.write.csv("casualties_summary.csv")

Vamos a leer este nuevo archivo en un RDD

In [11]:
rddL=sc.textFile("casualties_summary.csv")
rddL.collect()

['Albania,1987,21,6.71',
 'Albania,1987,16,5.19',
 'Albania,1987,14,4.83',
 'Albania,1987,1,4.59',
 'Albania,1987,9,3.28',
 'Albania,1987,1,2.81',
 'Albania,1987,6,2.15',
 'Albania,1987,4,1.56',
 'Albania,1987,1,0.73',
 'Albania,1987,0,0.0',
 'Albania,1987,0,0.0',
 'Albania,1987,0,0.0',
 'Albania,1988,2,5.49',
 'Albania,1988,17,5.33',
 'Albania,1988,1,4.48',
 'Albania,1988,14,4.46',
 'Albania,1988,4,2.85',
 'Albania,1988,8,2.71',
 'Albania,1988,3,2.03',
 'Albania,1988,5,1.91',
 'Albania,1988,5,1.79',
 'Albania,1988,4,1.41',
 'Albania,1988,0,0.0',
 'Albania,1988,0,0.0',
 'Albania,1989,2,8.89',
 'Albania,1989,18,6.35',
 'Albania,1989,15,4.71',
 'Albania,1989,6,4.22',
 'Albania,1989,12,3.71',
 'Albania,1989,7,2.43',
 'Albania,1989,5,1.67',
 'Albania,1989,2,0.75',
 'Albania,1989,1,0.67',
 'Albania,1989,0,0.0',
 'Albania,1989,0,0.0',
 'Albania,1989,0,0.0',
 'Albania,1992,12,3.49',
 'Albania,1992,9,3.41',
 'Albania,1992,5,3.13',
 'Albania,1992,7,2.85',
 'Albania,1992,7,2.39',
 'Albania,1992,

# **Ejemplo de Hive**


Utilizando Hive para crear y leer una tabla

In [12]:
from pyspark.sql import Row
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
test_list = [('A', 25),('B', 20),('C', 25),('D', 18)]
rdd = sc.parallelize(test_list)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
schemaPeople = sqlContext.createDataFrame(people)
# Register it as a temp table
sqlContext.registerDataFrameAsTable(schemaPeople, "test_table")
sqlContext.sql("show tables").show()



+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         |test_table|       true|
+---------+----------+-----------+



Realizamos la consulta

In [13]:
sqlContext.sql("Select * from test_table").show()

+----+---+
|name|age|
+----+---+
|   A| 25|
|   B| 20|
|   C| 25|
|   D| 18|
+----+---+



**Cargar el JSON en Hive y consultarlo**

Carguemos Example1.json con Hive y hagamos una sentencia Select en él


In [14]:
from pyspark.sql import HiveContext
hiveCtx = HiveContext(sc)
ex1 = hiveCtx.read.option("multiline","true").json("Example1.json")
ex1.registerTempTable("ex1")
results = hiveCtx.sql("SELECT event_name, init_date FROM ex1").show()



+--------------------+---------+
|          event_name|init_date|
+--------------------+---------+
|       CosmosDB Conf| 20180613|
|    Vue.js Conf 2018| 20180501|
|           IoT Space| 20180621|
|       CosmosDB Conf| 20180613|
|Cloud Conference ...| 20180411|
+--------------------+---------+

