In [15]:
# -*- coding: utf-8 -*-
"""

DATAFRAMES
São coleções distribuídas de dados agrupados em colunas nomeadas. Ou seja, são estruturados (ao contrário dos RDDS) - como se fossem uma coleção de tabelas.

"""


'\n-----------------------------------------------------------------------------\n\n                   Spark with Python             \n                    \n                 Dataframe & Spark SQL\n-----------------------------------------------------------------------------\n'

In [34]:
from pyspark.sql import SparkSession
SpSession = SparkSession.builder.appName("Trabalhando com Data Frames").getOrCreate()


In [39]:

#Cria um dataframe a partir de um arquivo json
empDf = SpSession.read.json("customerData.json")
empDf.show()
empDf.printSchema()


+---+------+------+-----------------+------+
|age|deptid|gender|             name|salary|
+---+------+------+-----------------+------+
| 32|   100|  male|Benjamin Garrison|  3000|
| 40|   200|  male|    Holland Drake|  4500|
| 26|   100|  male|  Burks Velasquez|  2700|
| 51|   100|female|    June Rutledge|  4300|
| 44|   200|  male|    Nielsen Knapp|  6500|
+---+------+------+-----------------+------+

root
 |-- age: string (nullable = true)
 |-- deptid: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)
 |-- salary: string (nullable = true)



In [40]:
#Consulta o data frame
empDf.select("name").show()
empDf.filter(empDf["age"] == 40).show()
empDf.groupBy("gender").count().show()
empDf.groupBy("deptid").\
    agg({"salary": "avg", "age": "max"}).show()



+-----------------+
|             name|
+-----------------+
|Benjamin Garrison|
|    Holland Drake|
|  Burks Velasquez|
|    June Rutledge|
|    Nielsen Knapp|
+-----------------+

+---+------+------+-------------+------+
|age|deptid|gender|         name|salary|
+---+------+------+-------------+------+
| 40|   200|  male|Holland Drake|  4500|
+---+------+------+-------------+------+

+------+-----+
|gender|count|
+------+-----+
|female|    1|
|  male|    4|
+------+-----+

+------+------------------+--------+
|deptid|       avg(salary)|max(age)|
+------+------------------+--------+
|   200|            5500.0|      44|
|   100|3333.3333333333335|      51|
+------+------------------+--------+



In [41]:
#Cria um data frame a partir de uma lista
deptList = [{'name': 'Sales', 'id': "100"},\
 { 'name':'Engineering','id':"200" }]
deptDf = SpSession.createDataFrame(deptList)
deptDf.show()
 


+---+-----------+
| id|       name|
+---+-----------+
|100|      Sales|
|200|Engineering|
+---+-----------+



In [42]:
#junção de data frames
empDf.join(deptDf, empDf.deptid == deptDf.id).show()
 


+---+------+------+-----------------+------+---+-----------+
|age|deptid|gender|             name|salary| id|       name|
+---+------+------+-----------------+------+---+-----------+
| 51|   100|female|    June Rutledge|  4300|100|      Sales|
| 26|   100|  male|  Burks Velasquez|  2700|100|      Sales|
| 32|   100|  male|Benjamin Garrison|  3000|100|      Sales|
| 44|   200|  male|    Nielsen Knapp|  6500|200|Engineering|
| 40|   200|  male|    Holland Drake|  4500|200|Engineering|
+---+------+------+-----------------+------+---+-----------+



In [43]:
#operações em cascata
empDf.filter(empDf["age"] >30).join(deptDf, \
        empDf.deptid == deptDf.id).\
        groupBy("deptid").\
        agg({"salary": "avg", "age": "max"}).show()
        


+------+-----------+--------+
|deptid|avg(salary)|max(age)|
+------+-----------+--------+
|   200|     5500.0|      44|
|   100|     3650.0|      51|
+------+-----------+--------+



In [44]:
#............................................................................
##   Criando dataframes a partir de um rdd
#............................................................................

#Inicializa SparkSession e SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext

from pyspark.sql import Row

#Cria a Spark Session
SpSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("Dataframe a partir de um RDD & SQL") \
    .config("spark.executor.memory", "1g") \
    .config("spark.cores.max","2") \
    .config("spark.sql.warehouse.dir", "file:///c:/temp/spark-warehouse")\
    .getOrCreate()

#Obtém  a Spark Context do Spark Session    
SpContext = SpSession.sparkContext

lines = SpContext.textFile("auto-data.csv")


In [45]:
#remove a primeira linha
datalines = lines.filter(lambda x: "FUELTYPE" not in x)
datalines.count()



197

In [46]:
parts = datalines.map(lambda l: l.split(","))
autoMap = parts.map(lambda p: Row(make=p[0],\
         body=p[4], hp=int(p[7])))


In [47]:
# Registra o dataframe como tabela
autoDf = SpSession.createDataFrame(autoMap)
autoDf.show()




+----------+---------+---+
|      make|     body| hp|
+----------+---------+---+
|    subaru|hatchback| 69|
| chevrolet|hatchback| 48|
|     mazda|hatchback| 68|
|    toyota|hatchback| 62|
|mitsubishi|hatchback| 68|
|     honda|hatchback| 60|
|    nissan|    sedan| 69|
|     dodge|hatchback| 68|
|  plymouth|hatchback| 68|
|     mazda|hatchback| 68|
|mitsubishi|hatchback| 68|
|     dodge|hatchback| 68|
|  plymouth|hatchback| 68|
| chevrolet|hatchback| 70|
|    toyota|hatchback| 62|
|     dodge|hatchback| 68|
|     honda|hatchback| 58|
|    toyota|hatchback| 62|
|     honda|hatchback| 76|
| chevrolet|    sedan| 70|
+----------+---------+---+
only showing top 20 rows



In [48]:
#............................................................................
##   Cria dataframe a partir de um arquivo CSV
#...........................................................................
autoDf1 = SpSession.read.csv("auto-data.csv",header=True)
autoDf1.show()



+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|      MAKE|FUELTYPE|ASPIRE|DOORS|     BODY|DRIVE|CYLINDERS| HP| RPM|MPG-CITY|MPG-HWY|PRICE|
+----------+--------+------+-----+---------+-----+---------+---+----+--------+-------+-----+
|    subaru|     gas|   std|  two|hatchback|  fwd|     four| 69|4900|      31|     36| 5118|
| chevrolet|     gas|   std|  two|hatchback|  fwd|    three| 48|5100|      47|     53| 5151|
|     mazda|     gas|   std|  two|hatchback|  fwd|     four| 68|5000|      30|     31| 5195|
|    toyota|     gas|   std|  two|hatchback|  fwd|     four| 62|4800|      35|     39| 5348|
|mitsubishi|     gas|   std|  two|hatchback|  fwd|     four| 68|5500|      37|     41| 5389|
|     honda|     gas|   std|  two|hatchback|  fwd|     four| 60|5500|      38|     42| 5399|
|    nissan|     gas|   std|  two|    sedan|  fwd|     four| 69|5200|      31|     37| 5499|
|     dodge|     gas|   std|  two|hatchback|  fwd|     four| 68|5500| 

In [49]:
#............................................................................
##   Criando e trabalhando com tabelas temporárias
#............................................................................

autoDf.createOrReplaceTempView("autos")
SpSession.sql("select * from autos where hp > 200").show()



+-------+-----------+---+
|   make|       body| hp|
+-------+-----------+---+
|porsche|    hardtop|207|
|porsche|    hardtop|207|
| jaguar|      sedan|262|
|porsche|convertible|207|
+-------+-----------+---+



In [50]:
#Registrando um dataframe como tabela e executando comandos SQL
empDf.createOrReplaceTempView("employees")
SpSession.sql("select * from employees where salary > 4000").show()



+---+------+------+-------------+------+
|age|deptid|gender|         name|salary|
+---+------+------+-------------+------+
| 40|   200|  male|Holland Drake|  4500|
| 51|   100|female|June Rutledge|  4300|
| 44|   200|  male|Nielsen Knapp|  6500|
+---+------+------+-------------+------+



In [51]:
#Convertendo para um dataframe Pandas
empPands = empDf.toPandas()
for index, row in empPands.iterrows():
    print(row["salary"])



3000
4500
2700
4300
6500


In [33]:
#............................................................................
##   Trabalhando com Banco de Dados

#Importe a classe que conecta com o seu banco de dados aqui
#import classeConectaSgdbXpto 
# ou

#............................................................................
#Certifique-se de que o spark classpaths estão setados corretamente no 
#arquivo spark-defaults.conf para incluir os arquivos de driver
    
demoDf = SpSession.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/demo",
    driver = "com.mysql.jdbc.Driver",
    dbtable = "demotable",
    user="root",
    password="").load()
    
demoDf.show()

Py4JJavaError: An error occurred while calling o254.load.
: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver
	at java.net.URLClassLoader.findClass(Unknown Source)
	at java.lang.ClassLoader.loadClass(Unknown Source)
	at java.lang.ClassLoader.loadClass(Unknown Source)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:99)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:99)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:35)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:32)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:344)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:297)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:286)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:286)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:221)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
