In [1]:
from pyspark.sql import SparkSession
from pathlib import Path
from os.path import abspath


BASE_DATA_DIR = Path().home() / "Documents/PySparkCurso/download"
BASE_DIR = Path().resolve()

spark: SparkSession = (
    SparkSession.builder.master("local")
    .appName("streaming with spark saving postgres")
    .config("spark.sql.warehouse.dir", abspath("spark-warehouse"))
    .enableHiveSupport()
    .getOrCreate()
)

24/04/17 16:16:19 WARN Utils: Your hostname, IdeaPad-Gaming-3-15IHU6 resolves to a loopback address: 127.0.1.1; using 192.168.1.5 instead (on interface wlp0s20f3)
24/04/17 16:16:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/17 16:16:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [14]:
despachantes_schema = (
    "id INT, name STRING, status STRING, city STRING, sales STRING, data DATE"
)

df = spark.read.csv(
    str(BASE_DATA_DIR / "despachantes.csv"),
    schema=despachantes_schema,
    header=False,
    sep=",",
)
df.show(5)

+---+-------------------+------+-------------+-----+----------+
| id|               name|status|         city|sales|      data|
+---+-------------------+------+-------------+-----+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05|
+---+-------------------+------+-------------+-----+----------+
only showing top 5 rows



In [5]:
spark.sql("CREATE DATABASE IF NOT EXISTS desp;")
spark.sql("SHOW DATABASES;").show()

+---------+
|namespace|
+---------+
|  default|
|     desp|
+---------+



24/04/17 16:21:43 WARN ObjectStore: Failed to get database desp, returning NoSuchObjectException
24/04/17 16:21:43 WARN ObjectStore: Failed to get database desp, returning NoSuchObjectException
24/04/17 16:21:43 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
24/04/17 16:21:43 WARN ObjectStore: Failed to get database desp, returning NoSuchObjectException


In [15]:
spark.sql("USE desp;")

DataFrame[]

In [16]:
spark.sql("SHOW TABLES;").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [18]:
df.write.saveAsTable("despachantes")

24/04/17 16:27:46 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
24/04/17 16:27:46 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
24/04/17 16:27:46 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/04/17 16:27:46 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


In [19]:
spark.sql("SHOW TABLES;").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|     desp|despachantes|      false|
+---------+------------+-----------+



In [21]:
spark.sql("SELECT * FROM despachantes;").show(5)

+---+-------------------+------+-------------+-----+----------+
| id|               name|status|         city|sales|      data|
+---+-------------------+------+-------------+-----+----------+
|  1|   Carminda Pestana| Ativo|  Santa Maria|   23|2020-08-11|
|  2|    Deolinda Vilela| Ativo|Novo Hamburgo|   34|2020-03-05|
|  3|   Emídio Dornelles| Ativo| Porto Alegre|   34|2020-02-05|
|  4|Felisbela Dornelles| Ativo| Porto Alegre|   36|2020-02-05|
|  5|     Graça Ornellas| Ativo| Porto Alegre|   12|2020-02-05|
+---+-------------------+------+-------------+-----+----------+
only showing top 5 rows



In [22]:
churn = spark.read.csv(
    str(BASE_DATA_DIR / "Churn.csv"),
    header=True,
    inferSchema=True,
    sep=";",
)
churn.show(5)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|CreditScore|Geography|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|        619|   France|Female| 42|     2|       0|            1|        1|             1|       10134888|     1|
|        608|    Spain|Female| 41|     1| 8380786|            1|        0|             1|       11254258|     0|
|        502|   France|Female| 42|     8| 1596608|            3|        1|             0|       11393157|     1|
|        699|   France|Female| 39|     1|       0|            2|        0|             0|        9382663|     0|
|        850|    Spain|Female| 43|     2|12551082|            1|        1|             1|         790841|     0|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+-------

In [23]:
churn.write.partitionBy("Geography").saveAsTable("churn_geo")

In [25]:
spark.sql("SHOW TABLES;").show()

+---------+------------+-----------+
|namespace|   tableName|isTemporary|
+---------+------------+-----------+
|     desp|   churn_geo|      false|
|     desp|despachantes|      false|
+---------+------------+-----------+



In [26]:
spark.sql("SELECT * FROM churn_geo;").show(5)

+-----------+------+---+------+--------+-------------+---------+--------------+---------------+------+---------+
|CreditScore|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|Geography|
+-----------+------+---+------+--------+-------------+---------+--------------+---------------+------+---------+
|        619|Female| 42|     2|       0|            1|        1|             1|       10134888|     1|   France|
|        502|Female| 42|     8| 1596608|            3|        1|             0|       11393157|     1|   France|
|        699|Female| 39|     1|       0|            2|        0|             0|        9382663|     0|   France|
|        822|  Male| 50|     7|       0|            2|        1|             1|         100628|     0|   France|
|        501|  Male| 44|     4|14205107|            2|        0|             1|         749405|     0|   France|
+-----------+------+---+------+--------+-------------+---------+--------------+---------------+-

bucket(use case when cardinal is high)

In [27]:
churn.write.bucketBy(3, "Geography").saveAsTable("churn_geo_2")