In [1]:
import pandas as pd
import numpy as np
import os
import pyspark
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.types import * ## Librería para realizacion de cast
from pyspark.sql.functions import *

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext, SparkSession, SQLContext

In [4]:
def getContext(procedure, source, data):
    strPar = "Proc:{0} Source:{1} Data:{2}".format(procedure, source, data)
    conf = SparkConf().setAppName(strPar)##Nombre de la aplicación
    conf.set("spark.hadoop.validateOutputSpecs", "false") ## Configuracion para HDFS
    sc = SparkContext.getOrCreate(conf=conf)##Creación de Spark Context
    sqlContext = SQLContext(sc) ##
    return sqlContext, sc

sqlContext, sc = getContext('TrainingSpark', 'local', 'local')

### Conexión a fuentes de información

In [None]:
#from impala.dbapi import connect
#conn = connect('localhost', port=21050)
% telnet 10.152.145.98 21050
#cursor = conn.cursor()
#cursor.execute('SHOW DATABASES')
#cursor.fetchall()

In [None]:
#! pip install pyhive
from pyhive import hive
conn = hive.connect('elbahidata00.risorse.enel', port=10000)
#cursor = conn.cursor()
#cursor.execute('SHOW DATABASES')
#cursor.fetchall()

In [None]:
sparkSession = (SparkSession.builder.appName('example-pyspark-read-from-hive').config("hive.metastore.uris","thrift://elbahidata00.risorse.enel:10000").enableHiveSupport().getOrCreate())
sparkSession.sql('show tables').show()

#config("hive.metastore.uris","thrift://elbahidata01.risorse.enel:10000")

### Creación de sesión en Spark

In [None]:
spark = SparkSession.builder.getOrCreate()
#print(spark)

In [None]:
spark

### Copia de información desde HDFS a filesystem

In [None]:
os.system('hdfs dfs -get /user/co_digital_hub/raw/forcebeat/tdc/ /carga_datos_co/jupyter_notebook/tmp/Data')

In [None]:
os.listdir('/carga_datos_co/jupyter_notebook/tmp/Data')

### Lectura de parquet desde filesystem to spark dataframe

In [None]:
df = spark.read.parquet('/carga_datos_co/jupyter_notebook/tmp/Data/tdc')

In [None]:
df.head()

In [None]:
#df3 = spark.read.load("tabla_temporal")

### Lectura de CSV

In [None]:
os.system('hdfs dfs -get /user/co_digital_hub/raw/forcebeat/circuitos_bogota/ /carga_datos_co/jupyter_notebook/tmp/Data')
os.listdir('/carga_datos_co/jupyter_notebook/tmp/Data')

In [None]:
#df = spark.read.format("csv").option("header", "true").load("circuitos_bogota.csv")
lines = sc.textFile("/carga_datos_co/jupyter_notebook/tmp/Data/circuitos_bogota/circuitos_bogota.csv")

In [None]:
lines.collect() ## Mostrar información RDD

In [None]:
lines.first()

In [None]:
parts = lines.map(lambda l: l.split("~"))

In [None]:
#rdd_distribuido = parts.map(lambda p: Row(name=p[0],age=int(p[1])))

In [None]:
#dd_distribuido.collect()

In [None]:
data_df = spark.createDataFrame(parts)

In [None]:
headers = [ 'zona',
            'sub_zona',
            'cuadrante',
            'cod_contrato',
            'contrato',
            'circuito',
            'tipo_ubicacion',
            'grupo',
            'longitud',
            'latitud',
            'cod_contrato_ant']

columns = data_df.columns
str_name = ['col("'+columns[i]+'").alias("'+headers[i]+'")' for i in range(0,len(headers)) ]
data_df = eval('data_df.select('+(',').join(str_name)+')')

In [None]:
data_df.head()

In [None]:
data_df.dtypes 

In [None]:
data_df.select(data_df.zona).describe().show()

In [None]:
data_df.columns

In [None]:
 data_df.count() 

In [None]:
data_df.select(data_df.zona).distinct().show()

In [None]:
data_df.select(data_df.zona).distinct().count()

In [None]:
 data_df.printSchema()

In [None]:
data_df.show()

In [None]:
data_df.explain()

In [None]:
data_df.head()

In [None]:
data_df.first()

In [None]:
data_df.take(5)

In [None]:
#data_df = data_df.withColumn('grupo', col('grupo').cast('float'))
#data_df = data_df.withColumn('longitud', col('longitud').cast('float'))
#data_df = data_df.withColumn('latitud', col('latitud').cast(FloatType()))

In [None]:
data_df = data_df.withColumn('circuito', regexp_replace('circuito', '[_]', ' '))

In [None]:
data_df.head()

In [None]:
data_df.schema 

In [None]:
data_df.select('sub_zona').show()

In [None]:
data_df.dtypes 

In [None]:
#df.select("circuito","zona",explode("cuadrante").alias("Cuadrante Alias")).select("zona","circuito","cuadrante").show()

### Select

In [None]:
data_df.select(data_df["zona"],data_df["longitud"]+ 1).filter(data_df["longitud"].isNotNull()).show()

In [None]:
data_df.filter(data_df["longitud"].isNotNull()).show()

In [None]:
data_df.select(data_df['grupo'] > 5).show()

### When

In [None]:
from pyspark.sql import functions as F
when = data_df.select("contrato",F.when(data_df.cuadrante > 3, 1).otherwise(0)).show()

### Like

In [None]:
data_df.select("contrato",data_df.contrato.like("%COLOMBIA%")).show()

### Startswith - Endswith

In [None]:
data_df.select("contrato",data_df.contrato.startswith("CAM")).show()

In [None]:
data_df.select("circuito","contrato",data_df.contrato.startswith("CAM")).show()

In [None]:
data_df.select("circuito",data_df.circuito.endswith("DAS")).show()

### Substring

In [None]:
data_df.select(data_df.circuito.substr(3, 9).alias("Circuito 2")).show()

### Between

In [None]:
 data_df.select("cuadrante",data_df.cuadrante.between(4, 8)).filter(data_df.cuadrante.between(4, 8) == True).show()

### Renaming Columns

In [None]:
data_df = data_df.withColumnRenamed('cod_contrato_ant', 'codigo_contrato_ant')

In [None]:
## data_df = data_df.drop("address", "phoneNumber")
## data_df = data_df.drop(data_df.address).data_df(df.phoneNumber)

In [None]:
data_df.show()

In [None]:
data_df.groupBy("circuito").count().show()

### Sort

In [None]:
data_df.groupBy("circuito").count().sort(data_df.circuito.desc()).show()

In [None]:
 data_df.sort("circuito", ascending=False).show()

In [None]:
data_df.orderBy(["circuito","cuadrante"],ascending=[0,1]).show()

In [None]:
data_df.na.fill(0).show()

In [None]:
data_df.na.drop().show()

### Registering DataFrames as Views

In [None]:
data_df.createGlobalTempView("circuitos_global_temp_view")

In [None]:
data_df.createTempView("circuitos_temp_view")

In [None]:
data_df.createOrReplaceTempView("circuitos_temp_view")

### Creacion de base de datos en spark

In [None]:
spark.sql("create database circuitos")

In [None]:
data_df.write.saveAsTable("'circuitos'.circuitos_table")

### Listar bases de datos en spark

In [None]:
print(spark.catalog.listDatabases())

### Listar tablas de base de datos en spark

In [None]:
print(spark.catalog.listTables('circuitos'))

In [None]:
print(spark.catalog.listTables())

### Query Views Spark Catalog

In [None]:
df_query = spark.sql("SELECT * FROM circuitos_table").show()

In [None]:
df_query_temp = spark.sql("SELECT * FROM circuitos_temp_view").show()

### Cast

##### RDD

In [None]:
rdd1 = data_df.rdd 

##### JSON

In [None]:
json = data_df.toJSON().first()

In [None]:
json

##### Pandas dataframe

In [None]:
dataframe = data_df.toPandas()

### Write & Save to Files

In [None]:
 data_df.select("zona", "sub_zona").write.save("Circuitos.parquet")

In [None]:
 data_df.select("zona", "sub_zona").write.save("Circuitos.json",format="json")

In [None]:
##spark.stop()

### Temporal

In [None]:
hdfs_path = "/carga_datos_co/jupyter_notebook/tmp/Data"

In [None]:
pd_temp2 = pd.DataFrame(np.random.random(10))

In [None]:
# Create spark_temp from pd_temp
spark_temp = spark.createDataFrame(pd_temp2)

In [None]:
# Examine the tables in the catalog
print(spark.catalog.listTables())

In [None]:
# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView("tabla_temporal2")

In [None]:
# Examine the tables in the catalog again
print(spark.catalog.listTables())

In [None]:
#spark = SparkSession.builder.appName('abc').getOrCreate()
#df = pd.read_csv('path')
lista=[1,2,3,4,5,6]
lista
#spark.conf
df=spark.read.csv('filename.csv',header=True)