In [23]:
from pyspark.sql import SparkSession

In [24]:
spark = SparkSession.builder.appName('Aula de Spark').enableHiveSupport().getOrCreate()

In [25]:
spark

# ACESSO HADOOP

In [26]:
df_hdfs = spark.read.csv(path='hdfs://///datalake/bronze/aula4', header=True)

In [27]:
df_hdfs.describe()

DataFrame[summary: string, usuario: string, item: string, avaliacao: string]

In [28]:
df_hdfs.show()

+-------+----+---------+
|usuario|item|avaliacao|
+-------+----+---------+
|    109|   9|        3|
|    174| 412|        1|
|      7| 208|        5|
|    371|  97|        5|
|    296| 255|        2|
|    280|  82|        2|
|    271| 275|        4|
|    110| 791|        2|
|     59| 926|        1|
|    217| 576|        1|
|    145| 665|        5|
|    334| 204|        4|
|     42| 568|        4|
|    200| 143|        5|
|     89| 387|        5|
|    311| 588|        4|
|    235| 269|        4|
|    287| 156|        5|
|    344| 204|        4|
|     43| 289|        4|
+-------+----+---------+
only showing top 20 rows



# ACESSO S3

In [31]:
df_s3 = spark.read.csv('s3a://camada-bronze/aula4')

In [32]:
df_s3.describe()

DataFrame[summary: string, _c0: string, _c1: string, _c2: string]

In [34]:
df_s3.show()

+-------+----+---------+
|    _c0| _c1|      _c2|
+-------+----+---------+
|usuario|item|avaliacao|
|    109|   9|        3|
|    174| 412|        1|
|      7| 208|        5|
|    371|  97|        5|
|    296| 255|        2|
|    280|  82|        2|
|    271| 275|        4|
|    110| 791|        2|
|     59| 926|        1|
|    217| 576|        1|
|    145| 665|        5|
|    334| 204|        4|
|     42| 568|        4|
|    200| 143|        5|
|     89| 387|        5|
|    311| 588|        4|
|    235| 269|        4|
|    287| 156|        5|
|    344| 204|        4|
+-------+----+---------+
only showing top 20 rows



# ACESSO ODBC

In [35]:
conn = 'jdbc:postgresql://postgres:5432/dvdrental'

In [37]:
prop = {
    "user":"admin",
    "password":"admin",
    "driver":"org.postgresql.Driver"
}

In [38]:
prop

{'user': 'admin', 'password': 'admin', 'driver': 'org.postgresql.Driver'}

In [39]:
df_city = spark.read.jdbc(url=conn,properties=prop,table='public.city')

In [41]:
df_city.show()

+-------+--------------------+----------+-------------------+
|city_id|                city|country_id|        last_update|
+-------+--------------------+----------+-------------------+
|      2|                Abha|        82|2006-02-15 09:45:25|
|      3|           Abu Dhabi|       101|2006-02-15 09:45:25|
|      4|                Acua|        60|2006-02-15 09:45:25|
|      5|               Adana|        97|2006-02-15 09:45:25|
|      6|         Addis Abeba|        31|2006-02-15 09:45:25|
|      7|                Aden|       107|2006-02-15 09:45:25|
|      8|               Adoni|        44|2006-02-15 09:45:25|
|      9|          Ahmadnagar|        44|2006-02-15 09:45:25|
|     10|            Akishima|        50|2006-02-15 09:45:25|
|     11|               Akron|       103|2006-02-15 09:45:25|
|     12|              al-Ayn|       101|2006-02-15 09:45:25|
|     13|           al-Hawiya|        82|2006-02-15 09:45:25|
|     14|           al-Manama|        11|2006-02-15 09:45:25|
|     15

In [42]:
df_city.count()

652

# QUERY NO ODBC

In [43]:
query = '(select c.customer_id, c.first_name, c.email, c2.city \
from public.customer c \
inner join public.address a on c.address_id = a.address_id \
inner join public.city c2 on c2.city_id = a.city_id) as tab'

In [44]:
df_query = spark.read.jdbc(url=conn,properties=prop, table=query)

In [46]:
df_query.show(truncate=False)

+-----------+----------+-----------------------------------+---------------+
|customer_id|first_name|email                              |city           |
+-----------+----------+-----------------------------------+---------------+
|524        |Jared     |jared.ely@sakilacustomer.org       |Purwakarta     |
|1          |Mary      |mary.smith@sakilacustomer.org      |Sasebo         |
|2          |Patricia  |patricia.johnson@sakilacustomer.org|San Bernardino |
|3          |Linda     |linda.williams@sakilacustomer.org  |Athenai        |
|4          |Barbara   |barbara.jones@sakilacustomer.org   |Myingyan       |
|5          |Elizabeth |elizabeth.brown@sakilacustomer.org |Nantou         |
|6          |Jennifer  |jennifer.davis@sakilacustomer.org  |Laredo         |
|7          |Maria     |maria.miller@sakilacustomer.org    |Kragujevac     |
|8          |Susan     |susan.wilson@sakilacustomer.org    |Hamilton       |
|9          |Margaret  |margaret.moore@sakilacustomer.org  |Masqat         |

In [47]:
df_query.count()

599

In [49]:
df_query.show(2)

+-----------+----------+--------------------+----------+
|customer_id|first_name|               email|      city|
+-----------+----------+--------------------+----------+
|        524|     Jared|jared.ely@sakilac...|Purwakarta|
|          1|      Mary|mary.smith@sakila...|    Sasebo|
+-----------+----------+--------------------+----------+
only showing top 2 rows



# GRAVANDO NO DATALAKE

In [54]:
df_query.write.csv('hdfs:///datalake/bronze/aula4', header = True, sep=';', mode='append')

In [57]:
df_query2 = df_query.repartition(3)

In [58]:
df_query2.write.csv('hdfs:///datalake/bronze/aula4/gravacao', header = True, sep=';', mode='overwrite')

In [61]:
df_query2.write.json('s3a://camada-bronze/aula4/gravacao', mode='append')

In [62]:
df_query2.write.parquet('s3a://camada-bronze/aula4/gravacao', mode='append')

# GRAVANDO NO HIVE

In [63]:
df_query.write.format('hive').saveAsTable('default.query')

In [64]:
df_query.write.format('hive').insertInto('default.query')

In [66]:
df_query.write.mode('overwrite').format('hive').insertInto('default.query')

In [70]:
# OBS AO GRAVAR NO HIVE VC NAO ESCOLHE ONDE A TABELA É SALVA, 
# RECOMENDA-SE CRIAR UMA TABELA COM A FONTE ANTES
# SPARK SO PARA POPULAR E NAO CRIAR

In [71]:
# LENDO HIVE

In [73]:
df_hive = spark.read.table('aula.tabelaexterna')

In [74]:
df_hive.show()

+---+---------+-----+
| id|     nome|idade|
+---+---------+-----+
|  1|     joao|   30|
|  2|   marino|   40|
|  3|   daniel|   50|
|  4|     caio|   25|
|  1|  Richard|   68|
|  2|   Ashley|   35|
|  3| Cheyenne|   37|
|  4|  Eduardo|   50|
|  5|  Felicia|   49|
|  6|   Travis|   39|
|  7|Elizabeth|   61|
|  8|    Donna|   33|
|  9|   Nicole|   54|
| 10|    David|   29|
| 11|   Rachel|   28|
| 12|  Kirsten|   34|
| 13| Jennifer|   51|
| 14|   Amanda|   27|
| 15|   Joseph|   77|
| 16|    Faith|   37|
+---+---------+-----+
only showing top 20 rows



In [75]:
df_hive.count()

1000004

In [76]:
df_hive = spark.read.table('default.query')

In [None]:
# cruzando tabela hive com tabela s3

In [77]:
df_hive.join(df_query, df_query.customer_id == df_hive.customer_id).show()

+-----------+----------+--------------------+---------------+-----------+----------+--------------------+---------------+
|customer_id|first_name|               email|           city|customer_id|first_name|               email|           city|
+-----------+----------+--------------------+---------------+-----------+----------+--------------------+---------------+
|        524|     Jared|jared.ely@sakilac...|     Purwakarta|        524|     Jared|jared.ely@sakilac...|     Purwakarta|
|          1|      Mary|mary.smith@sakila...|         Sasebo|          1|      Mary|mary.smith@sakila...|         Sasebo|
|          2|  Patricia|patricia.johnson@...| San Bernardino|          2|  Patricia|patricia.johnson@...| San Bernardino|
|          3|     Linda|linda.williams@sa...|        Athenai|          3|     Linda|linda.williams@sa...|        Athenai|
|          4|   Barbara|barbara.jones@sak...|       Myingyan|          4|   Barbara|barbara.jones@sak...|       Myingyan|
|          5| Elizabeth|

In [86]:
df_total = spark.sql('select count(*) from parquet.`s3a://camada-bronze/aula4/gravacao`')

In [87]:
df_total.show()

+--------+
|count(1)|
+--------+
|     599|
+--------+



In [89]:
df_total.write.jdbc(url=conn, properties=prop, table = 'public.aula4')

# APIs

In [90]:
import requests

In [116]:
retorno = requests.get('https://catfact.ninja/fact')

In [117]:
retorno.status_code

200

In [125]:
retorno.text

'{"fact":"Most cats give birth to a litter of between one and nine kittens. The largest known litter ever produced was 19 kittens, of which 15 survived.","length":142}'

In [127]:
l = []
for x in range (10):
    print(x)
    retorno = requests.get('https://catfact.ninja/fact')
    l.append(retorno.json())
    

0
1
2
3
4
5
6
7
8
9


In [128]:
l

[{'fact': 'According to Hebrew legend, Noah prayed to God for help protecting all the food he stored on the ark from being eaten by rats. In reply, God made the lion sneeze, and out popped a cat.',
  'length': 184},
 {'fact': 'The longest living cat on record according to the Guinness Book belongs to the late Creme Puff of Austin, Texas who lived to the ripe old age of 38 years and 3 days!',
  'length': 165},
 {'fact': "The way you treat kittens in the early stages of it's life will render it's personality traits later in life.",
  'length': 109},
 {'fact': 'You check your cats pulse on the inside of the back thigh, where the leg joins to the body. Normal for cats: 110-170 beats per minute.',
  'length': 134},
 {'fact': "Cat's urine glows under a black light.", 'length': 38},
 {'fact': 'Cats have 300 million neurons; dogs have about 160 million',
  'length': 58},
 {'fact': 'Cat bites are more likely to become infected than dog bites.',
  'length': 60},
 {'fact': 'Researchers believe th

In [129]:
req = spark.read.json(spark.sparkContext.parallelize(l))

In [130]:
req.show()

+--------------------+------+
|                fact|length|
+--------------------+------+
|According to Hebr...|   184|
|The longest livin...|   165|
|The way you treat...|   109|
|You check your ca...|   134|
|Cat's urine glows...|    38|
|Cats have 300 mil...|    58|
|Cat bites are mor...|    60|
|Researchers belie...|   212|
|Miacis, the primi...|   133|
|The smallest pedi...|   249|
+--------------------+------+

