In [2]:

#Configuramos Spark

#importo la libreria
from pyspark.sql import SparkSession

spark = SparkSession\
.builder\
.config("spark.submit.deployMode", "client")\
.config("spark.executor.instances", "1")\
.config("spark.executor.memory", "1g")\
.config("spark.driver.memory", "1g")\
.config("spark.executor.memoryOverhead", "1g")\
.appName("prep_datos")\
.master("spark://bigdata-srv.fi.uncoma.edu.ar:7077")\
.getOrCreate()

spark.sparkContext._conf.getAll()  # ver la configuración

[('spark.driver.port', '33602'),
 ('spark.app.id', 'app-20210616170511-0000'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'prep_datos'),
 ('spark.driver.memory', '1g'),
 ('spark.driver.host', '192.168.0.20'),
 ('spark.sql.warehouse.dir',
  'file:/home/usuarioFAI/notebooks/spark-warehouse'),
 ('spark.app.startTime', '1623873906193'),
 ('spark.executor.memory', '1g'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.instances', '1'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.master', 'spark://bigdata-srv.fi.uncoma.edu.ar:7077'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.executor.memoryOverhead', '1g')]

In [3]:
#Leemos el archivo del servidor hdfs
df = spark.read.csv("hdfs://localhost:9000/cursoFAI/my_dataset/Water_Quality.csv", inferSchema="true", header="true")

In [4]:
import pandas as pd

In [5]:
df.printSchema()

root
 |-- Sample ID: integer (nullable = true)
 |-- Grab ID: integer (nullable = true)
 |-- Profile ID: integer (nullable = true)
 |-- Sample Number: string (nullable = true)
 |-- Collect DateTime: string (nullable = true)
 |-- Depth (m): double (nullable = true)
 |-- Site Type: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Locator: string (nullable = true)
 |-- Site: string (nullable = true)
 |-- Parameter: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- Units: string (nullable = true)
 |-- QualityId: integer (nullable = true)
 |-- Lab Qualifier: string (nullable = true)
 |-- MDL: double (nullable = true)
 |-- RDL: double (nullable = true)
 |-- Text Value: string (nullable = true)
 |-- Sample Info: string (nullable = true)
 |-- Steward Note: string (nullable = true)
 |-- Replicates: integer (nullable = true)
 |-- Replicate Of: integer (nullable = true)
 |-- Method: string (nullable = true)
 |-- Date Analyzed: string (nullable = true)
 |-- Data So

In [6]:
#removemos los espacios en las columnas para que sea más sencillo trabajarlos
from pyspark.sql import functions as F

df=df.select([F.col(col).alias(col.replace(' ', '_')) for col in df.columns])

In [7]:
df.printSchema()

root
 |-- Sample_ID: integer (nullable = true)
 |-- Grab_ID: integer (nullable = true)
 |-- Profile_ID: integer (nullable = true)
 |-- Sample_Number: string (nullable = true)
 |-- Collect_DateTime: string (nullable = true)
 |-- Depth_(m): double (nullable = true)
 |-- Site_Type: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Locator: string (nullable = true)
 |-- Site: string (nullable = true)
 |-- Parameter: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- Units: string (nullable = true)
 |-- QualityId: integer (nullable = true)
 |-- Lab_Qualifier: string (nullable = true)
 |-- MDL: double (nullable = true)
 |-- RDL: double (nullable = true)
 |-- Text_Value: string (nullable = true)
 |-- Sample_Info: string (nullable = true)
 |-- Steward_Note: string (nullable = true)
 |-- Replicates: integer (nullable = true)
 |-- Replicate_Of: integer (nullable = true)
 |-- Method: string (nullable = true)
 |-- Date_Analyzed: string (nullable = true)
 |-- Data_So

In [8]:
df.groupby('Site_Type').count().show()

+--------------------+------+
|           Site_Type| count|
+--------------------+------+
|         Large Lakes|791824|
|  Streams and Rivers|444848|
|    Swimming Beaches|   219|
|     Marine Offshore|299521|
|Freshwater - Unca...|   353|
|   Marine Intertidal| 51299|
+--------------------+------+



In [9]:
df.show(n=6, truncate=False, vertical=True)

-RECORD 0-------------------------------------------
 Sample_ID        | 17625                           
 Grab_ID          | null                            
 Profile_ID       | 20962                           
 Sample_Number    | L12637-8                        
 Collect_DateTime | 01/14/1998 12:48:00 PM          
 Depth_(m)        | null                            
 Site_Type        | Streams and Rivers              
 Area             | Cedar                           
 Locator          | A438                            
 Site             | Cedar River at SE Jones Rd      
 Parameter        | Storm Or Non-Storm              
 Value            | null                            
 Units            | none                            
 QualityId        | 2                               
 Lab_Qualifier    | null                            
 MDL              | null                            
 RDL              | null                            
 Text_Value       | S                         

In [10]:
df_dropped=df.drop('Grab_ID', 'Profile_ID', 'Sample_Number', 'QualityId', 'Lab_Qualifier', 'MDL', 'RDL', 'Text_Value',
                  'Sample_Info', 'Steward_Note', 'Replicates', 'Replicate_Of', 'Method', 'Date_Analyzed', 'Data_Source')

In [11]:
df_dropped.printSchema()

root
 |-- Sample_ID: integer (nullable = true)
 |-- Collect_DateTime: string (nullable = true)
 |-- Depth_(m): double (nullable = true)
 |-- Site_Type: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- Locator: string (nullable = true)
 |-- Site: string (nullable = true)
 |-- Parameter: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- Units: string (nullable = true)



In [12]:
#A partir de aquí generaremos dos archivos, uno para los lagos y otros para los rios
df_lakes=df_dropped.select("*").where(df_dropped.Site_Type=="Large Lakes").toPandas()
df_rivers=df_dropped.select("*").where(df_dropped.Site_Type=="Streams and Rivers").toPandas()

In [13]:
#leemos el archivo de puntos geográficos
df_sites = spark.read.csv("hdfs://localhost:9000/cursoFAI/my_dataset/WLRD_Sites.csv", inferSchema="true", header="true")

In [14]:
df_sites.printSchema()

root
 |-- SiteName: string (nullable = true)
 |-- Locator: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Site Type: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- New Georeferenced Column: string (nullable = true)



In [15]:
#primero quitamos los espacios de los nombres de las columnas
df_sites=df_sites.select([F.col(col).alias(col.replace(' ', '_')) for col in df_sites.columns])

In [16]:
df_sites.printSchema()

root
 |-- SiteName: string (nullable = true)
 |-- Locator: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Site_Type: string (nullable = true)
 |-- Area: string (nullable = true)
 |-- New_Georeferenced_Column: string (nullable = true)



In [17]:
#como solo nos interesa el locator, siteName, latitud, longitud y punto geográfico dropeamos el resto antes del join
df_sites_dropped=df_sites.drop('Site_Type', 'Area')

In [18]:
df_sites_dropped.printSchema()

root
 |-- SiteName: string (nullable = true)
 |-- Locator: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- New_Georeferenced_Column: string (nullable = true)



In [19]:
#convertimos a pandas
df_sites_pd=df_sites_dropped.toPandas()
#realizamos el merge de cada uno
df_lakes_merged=pd.merge(df_lakes, df_sites_pd, on="Locator")
df_rivers_merged=pd.merge(df_rivers, df_sites_pd, on="Locator")

In [20]:
#guardamos los csvs para trabajarlos por separado
df_lakes_merged.to_csv(r'/home/usuarioFAI/datasets/my_dataset/partitions/lakes.csv')
df_rivers_merged.to_csv(r'/home/usuarioFAI/datasets/my_dataset/partitions/rivers.csv')

In [21]:
#vamos a subir a hdfs
# importar hdfs
from hdfs import InsecureClient
#conectarme
client = InsecureClient('http://localhost:9870', user='usuarioFAI')
#subimos los archivos a hdfs
dir_archivo="/home/usuarioFAI/datasets/my_dataset/partitions/lakes.csv"
dir_destino="/cursoFAI/my_dataset/partitions/lakes.csv"
subir = client.upload(dir_destino,dir_archivo)
dir_archivo="/home/usuarioFAI/datasets/my_dataset/partitions/rivers.csv"
dir_destino="/cursoFAI/my_dataset/partitions/rivers.csv"
subir = client.upload(dir_destino,dir_archivo)

ChunkedEncodingError: ('Connection broken: IncompleteRead(0 bytes read, 2 more expected)', IncompleteRead(0 bytes read, 2 more expected))

In [22]:
#cierro la sesión
spark.sparkContext.stop()