In [5]:
from pyspark.sql import SparkSession
#Spark session with cluster mode(yarn)
spark = SparkSession.builder \
     .master("yarn") \
     .appName("Pyspark JDBC") \
     .config("spark.ui.port", "50032") \
     .config("spark.jars.packages","mysql:mysql-connector-java:5.1.44") \
     .config("spark.jars","jars/mysql-connector-java-5.1.44.jar") \
     .getOrCreate()

In [6]:
emp_mysql = spark. \
    read. \
    format("jdbc"). \
    option("url","jdbc:mysql://ms.itversity.com/retail_export"). \
    option("driver","com.mysql.jdbc.Driver"). \
    option("user","retail_user"). \
    option("password","itversity"). \
    option("dbtable","emp"). \
    load()
# spark make use of scala library/java library because it runs on jvm(spark written on scala)
# if you use python mysql package also doesn't work

In [10]:
emp_mysql.printSchema()

root
 |-- empno: decimal(4,0) (nullable = true)
 |-- ename: string (nullable = true)
 |-- job: string (nullable = true)
 |-- mgr: decimal(4,0) (nullable = true)
 |-- hiredate: date (nullable = true)
 |-- sal: decimal(7,2) (nullable = true)
 |-- comm: decimal(7,2) (nullable = true)
 |-- deptno: decimal(2,0) (nullable = true)



In [9]:
# Create table on jdbc  using indirect api
emp_mysql. \
    write.format("jdbc"). \
    option("url","jdbc:mysql://ms.itversity.com/retail_export"). \
    option("driver","com.mysql.jdbc.Driver"). \
    option("user","retail_user"). \
    option("password","itversity"). \
    option("dbtable","pyspark_created_table_emp").save()

In [13]:
# direct api
jdbc_props = { 'user' : 'retail_user', 'password' : 'itversity','Driver':"com.mysql.jdbc.Driver" }
emp_mysql.write.jdbc(url ="jdbc:mysql://ms.itversity.com/retail_export",table="pyspark_created_temp_table",
                    mode=None,properties=jdbc_props)

In [24]:
# insert into table on jdbc 
emp_mysql. \
    write.format("jdbc"). \
    option("url","jdbc:mysql://ms.itversity.com/retail_export"). \
    option("driver","com.mysql.jdbc.Driver"). \
    option("user","retail_user"). \
    option("password","itversity"). \
    mode("append"). \
    option("dbtable","pyspark_created_temp_table"). \
    save()

In [26]:
# Read Json file
emp_json = spark.read.format("json").load("data/employees.json")

In [31]:
emp_json.show()

+--------------------+--------------------+----------+------+---+-----------+--------------------+
|           addresses|               email|first_name|gender| id|  last_name|       phone_numbers|
+--------------------+--------------------+----------+------+---+-----------+--------------------+
|[[Ridgely, 21684,...|     wsmyth0@loc.gov|    Weidar|  Male|  1|      Smyth|      [550-543-4729]|
|[[Waco, 76796, Te...|   eogborn1@wisc.edu|     Emmit|  Male|  2|     Ogborn|[374-344-7772, 42...|
|[[Fort Worth, 761...|mdadswell2@edublo...|     Micah|  Male|  3|   Dadswell|[846-266-0132, 23...|
|[[Nashville, 3720...|  bcholwell3@who.int|      Berk|  Male|  4|   Cholwell|[524-533-7218, 92...|
|[[Austin, 78744, ...|hbasterfield4@int...|     Haley|  Male|  5|Basterfield|[558-470-2433, 90...|
|[[New Haven, 0652...|   sbezley5@xing.com|   Siouxie|Female|  6|     Bezley|[135-434-5299, 51...|
|[[Fort Myers, 339...| ugrewes6@drupal.org|      Ursa|Female|  7|     Grewes|[366-797-5110, 22...|
|[[Idaho F

In [28]:
emp_json.printSchema()

root
 |-- addresses: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- postal_code: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- street_name: string (nullable = true)
 |    |    |-- street_number: string (nullable = true)
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- last_name: string (nullable = true)
 |-- phone_numbers: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [56]:
# Query json columns
from pyspark.sql import functions as f
emp_json.select('first_name',
                'last_name',
                'gender',
                'addresses.city',
                'addresses.postal_code',
                'addresses.state',
                'phone_numbers').show()

+----------+-----------+------+--------------------+--------------------+--------------------+--------------------+
|first_name|  last_name|gender|                city|         postal_code|               state|       phone_numbers|
+----------+-----------+------+--------------------+--------------------+--------------------+--------------------+
|    Weidar|      Smyth|  Male|           [Ridgely]|             [21684]|          [Maryland]|      [550-543-4729]|
|     Emmit|     Ogborn|  Male|   [Waco, Milwaukee]|      [76796, 53205]|  [Texas, Wisconsin]|[374-344-7772, 42...|
|     Micah|   Dadswell|  Male|[Fort Worth, New ...|[76162, 10009, 72...|[Texas, New York,...|[846-266-0132, 23...|
|      Berk|   Cholwell|  Male|         [Nashville]|             [37205]|         [Tennessee]|[524-533-7218, 92...|
|     Haley|Basterfield|  Male|            [Austin]|             [78744]|             [Texas]|[558-470-2433, 90...|
|   Siouxie|     Bezley|Female|[New Haven, Brea,...|[06520, 92822, 40...

In [53]:
# save os ORC format in HDFS using PartitionBy
emp_mysql.write.format("orc").partitionBy("deptno").save("output/mysql_emp_to_orc")

In [57]:
# save os ORC format in HDFS using PartitionBy multiple columns

emp_mysql.write.format("orc").partitionBy("deptno","job").save("output/mysql_emp_to_orc_partition")

In [7]:
#to print all spark configuarations
#to get application URL(DAG) use setting PROXY_URI_BASES
spark.sparkContext.getConf().getAll() 

[('spark.history.kerberos.keytab', 'none'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.initialExecutors', '2'),
 ('spark.history.ui.port', '18081'),
 ('spark.jars.packages', 'mysql:mysql-connector-java:5.1.44'),
 ('spark.driver.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.dynamicAllocation.maxExecutors', '10'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://rm01.itversity.com:19088/proxy/application_1589064448439_13879'),
 ('spark.yarn.dist.pyFiles',
  'file:///home/rposam2020/.ivy2/jars/mysql_mysql-connector-java-5.1.44.jar'),
 ('spark.executor.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.driver.appUIAddress', 'http://gw03.itversity.com:50032'),
 ('spark.history.provider',
  'org.apache.spark.deploy.history.FsHistoryProvider'),
 ('spark.

In [4]:
#Create Spark Context
sc = spark.sparkContext

In [5]:
#Create SQL Context
sql = spark.sql

In [6]:
#Directly connect to hive Database
sql("use rposam_db")

DataFrame[]

In [7]:
#Query some table from hive database
sql("select * from emp").show()

+------+------+---------+------+----------+------+------+------+
| empno| ename|      job|   mgr|  hiredate|   sal|  comm|deptno|
+------+------+---------+------+----------+------+------+------+
|7369.0| SMITH|    CLERK|7902.0|1980-12-17| 800.0|  null|  20.0|
|7499.0| ALLEN| SALESMAN|7698.0|1981-02-20|1600.0| 300.0|  30.0|
|7521.0|  WARD| SALESMAN|7698.0|1981-02-22|1250.0| 500.0|  30.0|
|7566.0| JONES|  MANAGER|7839.0|1981-04-02|2975.0|  null|  20.0|
|7654.0|MARTIN| SALESMAN|7698.0|1981-09-28|1250.0|1400.0|  30.0|
|7698.0| BLAKE|  MANAGER|7839.0|1981-05-01|2850.0|  null|  30.0|
|7782.0| CLARK|  MANAGER|7839.0|1981-06-09|2450.0|  null|  10.0|
|7788.0| SCOTT|  ANALYST|7566.0|1982-12-09|3000.0|  null|  20.0|
|7839.0|  KING|PRESIDENT|  null|1981-11-17|5000.0|  null|  10.0|
|7844.0|TURNER| SALESMAN|7698.0|1981-09-08|1500.0|   0.0|  30.0|
|7876.0| ADAMS|    CLERK|7788.0|1983-01-12|1100.0|  null|  20.0|
|7900.0| JAMES|    CLERK|7698.0|1981-12-03| 950.0|  null|  30.0|
|7902.0|  FORD|  ANALYST|

In [36]:
emp.to_csv("output/emp.csv")

In [37]:
emp.to_excel("output/emp.xlsx")

In [38]:
emp.to_json("output/emp.json")

In [42]:
emp.to_parquet("output/emp.parquet",engine="pyarrow")

In [46]:
emp_csv = spark.read.csv("data/emp.csv")

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string]

In [22]:
##Read josn file using pandas(if you use spark then reads from HDFS) from local
pandas_df = pd.read_json("output/emp.json")

In [127]:
##Create Dataframe in Spark using Pandas DataFrame
spark_df = spark.createDataFrame(pandas_df)

In [128]:
##Registering as temp Table
spark_df.createOrReplaceTempView("empTab")

In [129]:
#Query using SparkSQL Core
sql("select * from empTab").show()

+-----+------+---------+------+----------+----+------+------+
|empno| ename|      job|   mgr|  hiredate| sal|  comm|deptno|
+-----+------+---------+------+----------+----+------+------+
| 7369| SMITH|    CLERK|7902.0|1980-12-17| 800|   NaN|    20|
| 7499| ALLEN| SALESMAN|7698.0|1981-02-20|1600| 300.0|    30|
| 7521|  WARD| SALESMAN|7698.0|1981-02-22|1250| 500.0|    30|
| 7566| JONES|  MANAGER|7839.0|1981-04-02|2975|   NaN|    20|
| 7654|MARTIN| SALESMAN|7698.0|1981-09-28|1250|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839.0|1981-05-01|2850|   NaN|    30|
| 7782| CLARK|  MANAGER|7839.0|1981-06-09|2450|   NaN|    10|
| 7788| SCOTT|  ANALYST|7566.0|1982-12-09|3000|   NaN|    20|
| 7839|  KING|PRESIDENT|   NaN|1981-11-17|5000|   NaN|    10|
| 7844|TURNER| SALESMAN|7698.0|1981-09-08|1500|   0.0|    30|
| 7876| ADAMS|    CLERK|7788.0|1983-01-12|1100|   NaN|    20|
| 7900| JAMES|    CLERK|7698.0|1981-12-03| 950|   NaN|    30|
| 7902|  FORD|  ANALYST|7566.0|1981-12-03|3000|   NaN|    20|
| 7934|M

In [8]:
#To read json file using Spark from HDFS 
df = spark.read.json("data/emp.json")

In [9]:
def convert_empdf_collection(df):
    l = []
    for row in df.collect():
        for i in range(0,len(row.empno)):
            l.append(
                (row.empno[i],
                 row.ename[i],
                 row.job[i],
                 row.mgr[i],
                 row.hiredate[i],
                 row.sal[i],
                 row.comm[i],
                 row.deptno[i])
            )
    return l

In [10]:
emp_l = convert_empdf_collection(df)
emp_df = spark.createDataFrame(emp_l,("empno","ename","job","mgr","hiredate","sal","comm","deptno"))
emp_df.show()

+-----+------+---------+------+----------+------+------+------+
|empno| ename|      job|   mgr|  hiredate|   sal|  comm|deptno|
+-----+------+---------+------+----------+------+------+------+
| 7369| SMITH|    CLERK|7902.0|1980-12-17| 800.0|  null|    20|
| 7499| ALLEN| SALESMAN|7698.0|1981-02-20|1600.0| 300.0|    30|
| 7876| ADAMS|    CLERK|7788.0|1983-01-12|1100.0|  null|    20|
| 7900| JAMES|    CLERK|7698.0|1981-12-03| 950.0|  null|    30|
| 7902|  FORD|  ANALYST|7566.0|1981-12-03|3000.0|  null|    20|
| 7934|MILLER|    CLERK|7782.0|1982-01-23|1300.0|  null|    10|
| 7521|  WARD| SALESMAN|7698.0|1981-02-22|1250.0| 500.0|    30|
| 7566| JONES|  MANAGER|7839.0|1981-04-02|2975.0|  null|    20|
| 7654|MARTIN| SALESMAN|7698.0|1981-09-28|1250.0|1400.0|    30|
| 7698| BLAKE|  MANAGER|7839.0|1981-05-01|2850.0|  null|    30|
| 7782| CLARK|  MANAGER|7839.0|1981-06-09|2450.0|  null|    10|
| 7788| SCOTT|  ANALYST|7566.0|1982-12-09|3000.0|  null|    20|
| 7839|  KING|PRESIDENT|  null|1981-11-1

In [11]:
emp_df[emp_df.deptno == 10].show()

+-----+------+---------+------+----------+------+----+------+
|empno| ename|      job|   mgr|  hiredate|   sal|comm|deptno|
+-----+------+---------+------+----------+------+----+------+
| 7934|MILLER|    CLERK|7782.0|1982-01-23|1300.0|null|    10|
| 7782| CLARK|  MANAGER|7839.0|1981-06-09|2450.0|null|    10|
| 7839|  KING|PRESIDENT|  null|1981-11-17|5000.0|null|    10|
+-----+------+---------+------+----------+------+----+------+

