In [1]:
import sys
import numpy as np
import pandas as pd
print("PYTHONPATH: {}".format(os.environ['PYTHONPATH']))
print("Spark: {}".format(spark.version))
print("Python: {}".format(sys.version))
spark.sparkContext

PYTHONPATH: /opt/conda/lib/python3.7/site-packages:/opt/spark3/python:/opt/spark3/python/lib/py4j-0.10.9-src.zip
Spark: 3.0.2
Python: 3.7.10 | packaged by conda-forge | (default, Feb 19 2021, 16:07:37) 
[GCC 9.3.0]


## Show Hive Tables

In [2]:
spark.sql("show tables").toPandas()

Unnamed: 0,database,tableName,isTemporary
0,default,access_log,False
1,default,bank,False
2,default,borrar,False
3,default,crimes,False
4,default,day_table,False
5,default,e150408_peliculas,False
6,default,e150408videos,False
7,default,employees,False
8,default,employees_bucket,False
9,default,employees_ext,False


## Leo, guardo y consulto tabla externa Parquet

In [3]:
days = pd.DataFrame({"day": ['mon', 'tue', 'wed', 'fri'], 
                    "sales": [5, 1, 2, 3]})
df = spark.createDataFrame(days)
df.toPandas()

Unnamed: 0,day,sales
0,mon,5
1,tue,1
2,wed,2
3,fri,3


In [4]:
# escribo datos en parquet 
dataDir = "/tmp/days_parquet"
df.write.parquet(dataDir, mode="overwrite")

# creo tabla externa
spark.sql("DROP TABLE IF EXISTS days_ext")
spark.sql(f"CREATE EXTERNAL TABLE days_ext (day string, sales bigint) STORED AS PARQUET LOCATION '{dataDir}'")

DataFrame[]

In [5]:
# consulto la tabla 
spark.sql("SELECT * FROM days_ext").orderBy('day').toPandas()

Unnamed: 0,day,sales
0,fri,3
1,mon,5
2,tue,1
3,wed,2


In [6]:
from pyspark.sql.functions import *

df2 = df.select('day', (2 * col('sales')).alias('sales'))
df2.toPandas()

Unnamed: 0,day,sales
0,mon,10
1,tue,2
2,wed,4
3,fri,6


In [7]:
df2.write.parquet(dataDir, mode="append")

In [8]:
# consulto la tabla 
spark.sql("SELECT * FROM days_ext").orderBy('day').toPandas()

Unnamed: 0,day,sales
0,fri,6
1,fri,3
2,mon,5
3,mon,10
4,tue,1
5,tue,2
6,wed,4
7,wed,2


In [9]:
spark.sql("DROP TABLE IF EXISTS days_ext")
spark.sql('show tables').toPandas()

Unnamed: 0,database,tableName,isTemporary
0,default,access_log,False
1,default,bank,False
2,default,borrar,False
3,default,crimes,False
4,default,day_table,False
5,default,e150408_peliculas,False
6,default,e150408videos,False
7,default,employees,False
8,default,employees_bucket,False
9,default,employees_ext,False


In [10]:
%%bash
hdfs dfs -rm -r -f -skipTrash /tmp/days_parquet

Deleted /tmp/days_parquet


# Salvar DataFrame como Tabla Hive

In [11]:
house_dom = spark.sparkContext.textFile("/data/CaliforniaHousing/cal_housing.domain")\
    .map(lambda s: s.split(':')[0])
columns = house_dom.collect()

house_data = spark.read.csv("/data/CaliforniaHousing/cal_housing.data", inferSchema=True)\
    .toDF(*columns)
house_data.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housingMedianAge: double (nullable = true)
 |-- totalRooms: double (nullable = true)
 |-- totalBedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- medianIncome: double (nullable = true)
 |-- medianHouseValue: double (nullable = true)



In [12]:
house_data.write.mode("overwrite").format("orc").saveAsTable("housing")

In [13]:
spark.sql("SELECT * FROM housing").limit(5).toPandas()

Unnamed: 0,longitude,latitude,housingMedianAge,totalRooms,totalBedrooms,population,households,medianIncome,medianHouseValue
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0


## Guardar Dataframe como TempView y guardar con CTAS

In [14]:
from pyspark.sql import Row

Record = Row("key", "value")
records_df = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 5)])
records_df.createOrReplaceTempView("records_view")
spark.sql("SELECT * FROM records_view").toPandas()

Unnamed: 0,key,value
0,1,val_1
1,2,val_2
2,3,val_3
3,4,val_4


In [15]:
spark.sql("DROP TABLE IF EXISTS records")

DataFrame[]

In [16]:
%%bash
hdfs dfs -rm -r -f -skipTrash /tmp/records

Deleted /tmp/records


In [17]:
spark.sql("CREATE EXTERNAL TABLE records STORED AS orc LOCATION '/tmp/records' AS SELECT * FROM records_view")

DataFrame[]

In [18]:
spark.sql("SHOW TABLES").show()

+--------+-----------------+-----------+
|database|        tableName|isTemporary|
+--------+-----------------+-----------+
| default|       access_log|      false|
| default|             bank|      false|
| default|           borrar|      false|
| default|           crimes|      false|
| default|        day_table|      false|
| default|e150408_peliculas|      false|
| default|    e150408videos|      false|
| default|        employees|      false|
| default| employees_bucket|      false|
| default|    employees_ext|      false|
| default|            geoip|      false|
| default|          housing|      false|
| default|   partition_demo|      false|
| default|          records|      false|
| default|         students|      false|
| default| turismo_turistas|      false|
| default|           weblog|      false|
| default|       weblog_ext|      false|
| default|     word_counter|      false|
|        |     records_view|       true|
+--------+-----------------+-----------+



In [19]:
spark.sql("SELECT * FROM records").toPandas()

Unnamed: 0,key,value
0,1,val_1
1,2,val_2
2,3,val_3
3,4,val_4


## saveAsTable

Recomiendo no usar pq es inestable y da error en varias situaciones