# Spark
* https://github.com/apache/spark
* 配置: https://spark.apache.org/docs/latest/configuration.html
* PySpark: https://spark.apache.org/docs/latest/api/python/getting_started/index.html

actions:
- https://github.com/jargonzhou/application-store/tree/main/data-engineering/spark

In [None]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable
[33mDEPRECATION: Loading egg at /home/zhoujiagen/.local/lib/python3.12/site-packages/pjsua2-2.15.1-py3.12-linux-x86_64.egg is deprecated. pip 25.1 will enforce this behaviour change. A possible replacement is to use pip for package installation. Discussion can be found at https://github.com/pypa/pip/issues/12330[0m[33m
[0mCollecting pyspark
  Downloading pyspark-3.5.5.tar.gz (317.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.8/317.2 MB[0m [31m15.5 kB/s[0m eta [36m5:39:20[0m

In [None]:
# Spark SQL
!pip install pyspark[sql]
# pandas API on Spark
# to plot your data, you can install plotly together.
!pip install pyspark[pandas_on_spark] plotly  
# Spark Connect
!pip install pyspark[connect]

In [2]:
import pyspark
pyspark.__version__

'3.5.4'

# DataFrame

In [1]:
from pyspark.sql import SparkSession

# .config("spark.some.config.option", "some-value")
spark = SparkSession.builder.appName('spike-app').master("spark://localhost:7077").getOrCreate()

In [2]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

# df = spark.createDataFrame([
#     Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
#     Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
#     Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
# ])
# df

# 带schema
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')


df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [6]:
# %env

# 'SPARK_AUTH_SOCKET_TIMEOUT': '15',
#  'SPARK_BUFFER_SIZE': '65536'

In [4]:
spark.stop()

# Spark Connect

```shell
# example: in master
/opt/bitnami/spark$ sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.5
starting org.apache.spark.sql.connect.service.SparkConnectServer, logging to /opt/bitnami/spark/logs/spark--org.apache.spark.sql.connect.service.SparkConnectServer-1-724dbdc1d558.out
```

In [None]:
# %env SPARK_REMOTE="sc://localhost:15002"

env: SPARK_REMOTE="sc://localhost:15002"


In [7]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('spike-connect-app').remote("sc://localhost:15002").getOrCreate()
# .config("spark.some.config.option", "some-value")

In [8]:
from datetime import datetime, date

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 04:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 04:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 04:00:00|
+---+---+-------+----------+-------------------+



In [12]:
# %env

#  'SPARK_AUTH_SOCKET_TIMEOUT': '15',
#  'SPARK_BUFFER_SIZE': '65536',
#  'SPARK_CONNECT_MODE_ENABLED': '1'

In [14]:
spark.stop()

In [None]:
# important: clean the environment
import os
if 'SPARK_CONNECT_MODE_ENABLED' in os.environ:
  del os.environ['SPARK_CONNECT_MODE_ENABLED']

In [None]:
SparkSession.getActiveSession()

# Pandas API

In [18]:
%env PYARROW_IGNORE_TIMEZONE=1

env: PYARROW_IGNORE_TIMEZONE=1


In [19]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('spike-pandas-app').master("spark://localhost:7077").getOrCreate()

In [20]:
ps.Series([1, 3, 5, np.nan, 6, 8])

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [21]:
ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [22]:
spark.stop()

# I Wanna to do ...

- read MySQL Table
- read Kafka
- read Redis
- read Elasticsearch

## MySQL

In [1]:
# !pip install findspark

In [1]:
# example
# https://github.com/apache/spark/blob/master/examples/src/main/python/sql/datasource.py#L380
#
# datasource options
# https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option

# import findspark
# findspark.init()
from pyspark.sql import SparkSession

# .config('spark.jars', "C:/Users/zhouj/.m2/repository/com/mysql/mysql-connector-j/8.0.32/mysql-connector-j-8.0.32.jar") \
    # .config('spark.driver.extraClassPath', "/opt/bitnami/spark/jars/mysql-connector-j-8.0.32.jar") \
spark = SparkSession.builder \
    .appName('spike-mysql-app') \
    .master("spark://localhost:7077") \
    .config('spark.jars', "/opt/bitnami/spark/jars/mysql-connector-j-8.0.32.jar") \
    .config('spark.driver.extraClassPath', "mysql-connector-j-8.0.32.jar") \
    .getOrCreate()

### read

In [18]:
# sql.SparkSession.read
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.read.html
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:mysql://192.168.3.178:3306/devops") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "users") \
    .option("user", "root") \
    .option("password", "change_me") \
    .load()
jdbcDF.show()

spark.read \
    .jdbc(url="jdbc:mysql://192.168.3.178:3306/devops", 
        table='users', 
        predicates=['id > 2'],
        properties = {'user': 'root', 'password': 'change_me'}) \
    .show()

+---+----------------+--------+
| id|           email|password|
+---+----------------+--------+
|  1|xxx1@example.com|    pwd1|
|  2|xxx2@example.com|    pwd2|
|  3|xxx3@example.com|    pwd3|
|  4|xxx4@example.com|    pwd4|
+---+----------------+--------+

+---+----------------+--------+
| id|           email|password|
+---+----------------+--------+
|  3|xxx3@example.com|    pwd3|
|  4|xxx4@example.com|    pwd4|
+---+----------------+--------+



In [19]:
# DataFrame上过滤
jdbcDF.filter(jdbcDF['id'] > 2).show()

+---+----------------+--------+
| id|           email|password|
+---+----------------+--------+
|  3|xxx3@example.com|    pwd3|
|  4|xxx4@example.com|    pwd4|
+---+----------------+--------+



### write

In [None]:
from pyspark.sql import Row

# DataFrame.write
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.write.html#pyspark.sql.DataFrame.write
df = spark.createDataFrame([
    Row(id=4, email='xxx@example.com', password='pwd')
])

# 追加: 存在主键重复
df.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://192.168.3.178:3306/devops") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "users") \
    .option("user", "root") \
    .option("password", "change_me") \
    .mode('append') \
    .save()

In [23]:
df = spark.createDataFrame([
  Row(id=1, email='xxx1@example.com', password='pwd1'),
  Row(id=2, email='xxx2@example.com', password='pwd2'),
  Row(id=3, email='xxx3@example.com', password='pwd3'),
  Row(id=4, email='xxx4@example.com', password='pwd4')
])

df.write \
  .jdbc(url="jdbc:mysql://192.168.3.178:3306/devops", 
        table='users', 
        # mode='ignore', 
        mode='overwrite', # 全部覆盖
        # mode='append', # 追加: 存在主键重复
        properties = {'user': 'root', 'password': 'change_me'})

In [24]:
spark.read \
    .jdbc(url="jdbc:mysql://192.168.3.178:3306/devops", 
        table='users', 
        properties = {'user': 'root', 'password': 'change_me'}) \
    .show()

+---+----------------+--------+
| id|           email|password|
+---+----------------+--------+
|  1|xxx1@example.com|    pwd1|
|  2|xxx2@example.com|    pwd2|
|  3|xxx3@example.com|    pwd3|
|  4|xxx4@example.com|    pwd4|
+---+----------------+--------+



### sql

In [6]:
%env HADOOP_HOME=.

env: HADOOP_HOME=.


In [6]:
# sql.SparkSession.sql
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.sql.html
#
# SQL syntax
# https://spark.apache.org/docs/latest/sql-ref-syntax.html

# Why we need this???
# : java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
# https://sparkbyexamples.com/spark/spark-hadoop-exception-in-thread-main-java-lang-unsatisfiedlinkerror-org-apache-hadoop-io-nativeio-nativeiowindows-access0ljava-lang-stringiz/

# spark.sql("""
# CREATE TABLE s_users(id BIGINT, email STRING, password STRING) USING JDBC
# OPTIONS (
#     'url'='jdbc:mysql://192.168.3.178:3306/devops',
#     'driver'='com.mysql.cj.jdbc.Driver',
#     'dbtable'='users',
#     'user'='root',
#     'password'='change_me' 
# )
# COMMENT 'table from devops.users'
# """).show()

# play in spark-sql
# 
# spark-sql (default)> CREATE TABLE s_users(id BIGINT, email STRING, password STRING) USING JDBC
#                    > OPTIONS (
#                    >     'url'='jdbc:mysql://192.168.3.178:3306/devops',
#                    >     'driver'='com.mysql.cj.jdbc.Driver',
#                    >     'dbtable'='users',
#                    >     'user'='root',
#                    >     'password'='change_me' 
#                    > )
#                    > COMMENT 'table from devops.users';

# spark-sql (default)> select * from s_users;
# 1       xxx1@example.com        pwd1
# 2       xxx2@example.com        pwd2
# 3       xxx3@example.com        pwd3
# 4       xxx4@example.com        pwd4
# Time taken: 1.079 seconds, Fetched 4 row(s)

In [7]:
# spark.sql("SELECT * FROM s_users").show()

In [8]:
spark.stop()

## Redis
* https://github.com/RedisLabs/spark-redis/blob/master/doc/python.md

## Machine Learning
* https://www.machinelearningplus.com/pyspark/what-is-sparksession/