In [5]:
conf = SparkConf(). \
    setMaster("local"). \
    setAppName("Orders_Revenue"). \
    set("conf.ui.port","10567")

In [4]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

In [7]:
sc = SparkContext(conf=conf)



22/11/21 15:25:33 WARN Utils: Your hostname, codeStation resolves to a loopback address: 127.0.1.1; using 192.168.160.83 instead (on interface wlo1)
22/11/21 15:25:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/21 15:25:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
productPath = "/home/solverbot/spark-warehouse/retail_db/products/part-00000"
orderitemPath = "/home/solverbot/spark-warehouse/retail_db/order_items/part-00000"
ordersPath = "/home/solverbot/spark-warehouse/retail_db/orders/part-00000.txt"

In [12]:
#What is the difference between Session and Context?
spark = SparkSession.builder.appName('newSession').getOrCreate()

In [13]:
orderDF = spark.read.csv("/home/solverbot/spark-warehouse/retail_db/order_items/") \
                    .toDF("order_item_id","order_item_order_id","product_id", "qty","product_cost","order_subtotal")
orderDF.head(2)

[Row(order_item_id='1', order_item_order_id='1', product_id='957', qty='1', product_cost='299.98', order_subtotal='299.98'),
 Row(order_item_id='2', order_item_order_id='2', product_id='1073', qty='1', product_cost='199.99', order_subtotal='199.99')]

In [14]:
type(orderDF)

pyspark.sql.dataframe.DataFrame

In [15]:
open(orderitemPath).read().splitlines()[0]

'1,1,957,1,299.98,299.98'

In [16]:
sc.textFile(orderitemPath).take(2)

                                                                                

['1,1,957,1,299.98,299.98', '2,2,1073,1,199.99,199.99']

In [18]:
#Note, even the numbers are read as strings when the DF is created
orderDF.printSchema()

root
 |-- order_item_id: string (nullable = true)
 |-- order_item_order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- qty: string (nullable = true)
 |-- product_cost: string (nullable = true)
 |-- order_subtotal: string (nullable = true)



In [19]:
orderDF.select("order_item_order_id","order_subtotal").show()

+-------------------+--------------+
|order_item_order_id|order_subtotal|
+-------------------+--------------+
|                  1|        299.98|
|                  2|        199.99|
|                  2|          50.0|
|                  2|        129.99|
|                  4|         24.99|
|                  4|         59.99|
|                  4|          50.0|
|                  4|         49.98|
|                  5|        299.98|
|                  5|         59.99|
|                  5|         49.98|
|                  5|        299.98|
|                  5|        129.99|
|                  7|        199.99|
|                  7|        299.98|
|                  7|         15.99|
|                  8|         59.99|
|                  8|         59.99|
|                  8|         49.98|
|                  8|          50.0|
+-------------------+--------------+
only showing top 20 rows



In [None]:
orderDF.describe().show()

In [None]:
#Interesting option to convert DF to SQL table

orderDF.createTempView("ordersView")

In [None]:
spark.sql("SELECT * FROM ordersView LIMIT 5").show()

In [25]:
#The sparksession contains the spark context. Using the below command the context can be 
#invoked, and the details of the runs can be checked
spark.sparkContext

In [26]:
orderDFInfer = spark.read.csv("/home/solverbot/spark-warehouse/retail_db/order_items/",inferSchema=True)

                                                                                

In [27]:
#Recollected that inferSchema can be used to get the types of the columns correct
orderDFInfer.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)



In [32]:
from pyspark.sql.types import IntegerType

orderDF = orderDF \
            .withColumn('order_item_id',orderDF.order_item_id.cast(IntegerType())) \
            .withColumn('order_item_order_id',orderDF.order_item_order_id.cast(IntegerType()))

In [33]:
orderDF.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- qty: string (nullable = true)
 |-- product_cost: string (nullable = true)
 |-- order_subtotal: string (nullable = true)



### Integrating the database connectivity using jdbc connectors

In [5]:
sparkSQL = SparkSession.builder \
       .appName("Python Spark SQL basic example") \
       .config("spark.jars", "/usr/share/java/postgresql-42.2.26.jar") \
       .getOrCreate()

22/11/21 16:23:15 WARN Utils: Your hostname, codeStation resolves to a loopback address: 127.0.1.1; using 192.168.160.83 instead (on interface wlo1)
22/11/21 16:23:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/11/21 16:23:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
orders_df = sparkSQL.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/postgres") \
    .option("dbtable", "orders") \
    .option("user", "postgres") \
    .option("password", 1234) \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [9]:
type(orders_df)

pyspark.sql.dataframe.DataFrame

In [10]:
orders_df.take(2)

[Row(order_id=1, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=11599, order_status='CLOSED'),
 Row(order_id=2, order_date=datetime.datetime(2013, 7, 25, 0, 0), order_customer_id=256, order_status='PENDING_PAYMENT')]

In [11]:
orders_df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [12]:
orders_df.show(2)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+
only showing top 2 rows



In [36]:
url = "jdbc:postgresql://localhost/"

In [32]:
help(sparkSQL.read.jdbc)

Help on method jdbc in module pyspark.sql.readwriter:

jdbc(url: str, table: str, column: Optional[str] = None, lowerBound: Union[str, int, NoneType] = None, upperBound: Union[str, int, NoneType] = None, numPartitions: Optional[int] = None, predicates: Optional[List[str]] = None, properties: Optional[Dict[str, str]] = None) -> 'DataFrame' method of pyspark.sql.readwriter.DataFrameReader instance
    Construct a :class:`DataFrame` representing the database table named ``table``
    accessible via JDBC URL ``url`` and connection ``properties``.
    
    Partitions of the table will be retrieved in parallel if either ``column`` or
    ``predicates`` is specified. ``lowerBound``, ``upperBound`` and ``numPartitions``
    is needed when ``column`` is specified.
    
    If both ``column`` and ``predicates`` are specified, ``column`` will be used.
    
    .. versionadded:: 1.4.0
    
    Parameters
    ----------
    table : str
        the name of the table
    column : str, optional
      

In [39]:
orders_df.write.csv('ordersdb')

                                                                                

In [44]:
#Remember to give the alias to the table that is created with select
ordersQuery = sparkSQL.read.format('jdbc') \
        .option('url',"jdbc:postgresql://localhost/postgres") \
        .option('dbtable','(SELECT * FROM orders LIMIT 5) q') \
        .option('user','postgres') \
        .option('password',1234) \
        .option("driver", "org.postgresql.Driver") \
        .load()

In [45]:
ordersQuery.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
+--------+-------------------+-----------------+---------------+



In [46]:
import pyspark.sql.functions as sf

In [48]:
from pyspark.sql.functions import *

In [49]:
orderitems= sparkSQL.read.format('jdbc') \
        .option('url',"jdbc:postgresql://localhost/postgres") \
        .option('dbtable','order_items') \
        .option('user','postgres') \
        .option('password',1234) \
        .option("driver", "org.postgresql.Driver") \
        .load()

In [51]:
orderitems.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: double (nullable = true)
 |-- order_item_product_price: double (nullable = true)



In [54]:
orderitems.select("order_item_quantity", "order_item_subtotal").show()

+-------------------+-------------------+
|order_item_quantity|order_item_subtotal|
+-------------------+-------------------+
|                  1|             299.98|
|                  1|             199.99|
|                  5|              250.0|
|                  1|             129.99|
|                  2|              49.98|
|                  5|             299.95|
|                  3|              150.0|
|                  4|             199.92|
|                  1|             299.98|
|                  5|             299.95|
|                  2|              99.96|
|                  1|             299.98|
|                  1|             129.99|
|                  1|             199.99|
|                  1|             299.98|
|                  5|              79.95|
|                  3|             179.97|
|                  5|             299.95|
|                  4|             199.92|
|                  1|               50.0|
+-------------------+-------------

In [55]:
productTable = sparkSQL.read.format('jdbc') \
        .option('url',"jdbc:postgresql://localhost/postgres") \
        .option('dbtable','products') \
        .option('user','postgres') \
        .option('password',1234) \
        .option("driver", "org.postgresql.Driver") \
        .load()

In [56]:
productTable.show(2)

+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|product_id|product_category_id|        product_name|product_description|product_price|       product_image|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
|         1|                  2|Quest Q64 10 FT. ...|                   |        59.98|http://images.acm...|
|         2|                  2|Under Armour Men'...|                   |       129.99|http://images.acm...|
+----------+-------------------+--------------------+-------------------+-------------+--------------------+
only showing top 2 rows



In [61]:
orders_df.selectExpr("CASE WHEN order_status IN ('COMPLETE','CLOSED') THEN 'COMPLETED' ELSE 'PENDING' END").show(2)

+------------------------------------------------------------------------------+
|CASE WHEN (order_status IN (COMPLETE, CLOSED)) THEN COMPLETED ELSE PENDING END|
+------------------------------------------------------------------------------+
|                                                                     COMPLETED|
|                                                                       PENDING|
+------------------------------------------------------------------------------+
only showing top 2 rows

