In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark import StorageLevel

In [3]:
from pyspark.sql import Window

In [4]:
from pyspark.sql.functions import regexp_extract, col
from pyspark.sql.functions import *

In [5]:
spark=SparkSession.builder\
                  .master("local")\
                  .appName('new')\
                  .getOrCreate()
sc=spark.sparkContext

22/11/01 10:12:48 WARN Utils: Your hostname, ashwins-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.29.199 instead (on interface en0)
22/11/01 10:12:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/01 10:12:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
#1. reading the data - Reader API
#2. crunching of data - transformations
#3. write the data back - Writer API

In [4]:
orderDf = spark.read.format("csv")\
         .option("header",True)\
         .option("inferSchema",True)\
         .option("path","orders-201019-002101.csv")\
         .load()


In [7]:
orderDf.write.format("csv")\
.mode("overwrite")\
.option("path","/Users/ashwinpandey/Desktop/folder")\
.save()

In [6]:
print("number of partitions are ", orderDf.rdd.getNumPartitions())
ordersRep = orderDf.repartition(4)


number of partitions are  1


In [7]:
ordersRep.write.format("csv")\
         .mode("overwrite")\
         .option("path","/Users/ashwinpandey/Desktop/folder")\
         .save()

In [9]:
#==========
#saving option
# 1- overwrite
# 2- append
# 3- errorIfExists
# 4- ignore

In [10]:
#=====
#Parquet is the default file format in apache spark when we talk about structured api's
#=====Spark File LayoutNumber of files is equal to number of partitions.
# 1. simple repartition - repartition
# 2. partitioning - partitionBy (allows partitioning pruning)
# 3. bucketBy
# 4. maxRecordsPerFile

In [11]:

orderDf = spark.read.format("csv")\
        .option("header",True)\
        .option("inferSchema",True)\
        .option("path","orders-201019-002101.csv")\
        .load()


In [12]:
orderDf.write.format("csv").partitionBy("order_status")\
       .mode("overwrite")\
       .option("path","/Users/ashwinpandey/Desktop/folder")\
       .save()


In [13]:
# AVRO

In [None]:
orderDf = spark.read.format("csv")\
         .option("header",True)\
         .option("inferSchema",True)\
         .option("path","/Users/trendytech/Desktop/data/orders.csv")\
         .load()

In [None]:
orderDf.write.format("avro")\
.mode("overwrite")\.option("path","/Users/trendytech/Desktop/newfolder4")\.save()


In [14]:
# Creating a table from data

In [15]:
orderDf = spark.read.format("csv")\
         .option("header",True)\
         .option("inferSchema",True)\
         .option("path","orders-201019-002101.csv")\
         .load()


In [16]:
orderDf.createOrReplaceTempView("orders")


In [18]:
resultDf = spark.sql("select order_status, count(*) as total_orders from orders group by order_status")
resultDf.show()

+---------------+------------+
|   order_status|total_orders|
+---------------+------------+
|PENDING_PAYMENT|       15030|
|       COMPLETE|       22900|
|        ON_HOLD|        3798|
| PAYMENT_REVIEW|         729|
|     PROCESSING|        8275|
|         CLOSED|        7556|
|SUSPECTED_FRAUD|        1558|
|        PENDING|        7610|
|       CANCELED|        1428|
+---------------+------------+



In [23]:
resultDf1 = spark.sql("select order_customer_id, count(*) as total_orders from orders where order_status = 'CLOSED' group by order_customer_id order by total_orders desc")
resultDf1.show()


+-----------------+------------+
|order_customer_id|total_orders|
+-----------------+------------+
|             1833|           6|
|             1687|           5|
|             1363|           5|
|             5493|           5|
|             2236|           4|
|            10018|           4|
|             3631|           4|
|            12431|           4|
|             7879|           4|
|             5319|           4|
|             2774|           4|
|            10263|           4|
|             4997|           4|
|             1443|           4|
|              569|           4|
|             9213|           4|
|             4573|           4|
|             4588|           4|
|            10111|           4|
|             7948|           4|
+-----------------+------------+
only showing top 20 rows



In [24]:
#Table has 2 parts
#1. data - warehouse - spark.sql.warehouse.dir
#2. metadata - catalog metastore - memory

In [25]:
orderDf.write.format("csv")\
       .mode("overwrite")\
       .saveAsTable("orders1")

In [26]:
#creating a database

In [29]:
orderDf = spark.read.format("csv")\
         .option("header",True)\
         .option("inferSchema",True)\
         .option("path","orders-201019-002101.csv")\
         .load()
spark.sql("create database if not exists retail")


DataFrame[]

In [30]:
orderDf.write.format("csv")\
       .mode("overwrite")\
       .saveAsTable("retail.orders2")

In [31]:
orderDf.write.format("csv")\
       .mode("overwrite")\
       .saveAsTable("retail.orders3")

In [34]:
spark.sql("create database if not exists retail")

DataFrame[]

In [36]:
orderDf.write.format("csv")\
       .bucketBy(4,"order_customer_id")\
       .sortBy("order_customer_id")\
       .saveAsTable("retail.orders4")


In [4]:
# Readind the un-structured data using regular expression

In [5]:
myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'
lines_df = spark.read.text("orders_new-201019-002101.csv")
#lines_df.printSchema()#lines_df.show()


In [8]:
final_df =lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),regexp_extract('value',myregex,2).alias("date"),regexp_extract('value',myregex,3).alias("customer_id"),regexp_extract('value',myregex,4).alias("status"))
final_df.printSchema()
final_df.show()


root
 |-- order_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)

+--------+----------+-----------+---------------+
|order_id|      date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
|       4|2013-07-25|       8827|         CLOSED|
|       5|2013-07-25|      11318|       COMPLETE|
|       6|2013-07-25|       7130|       COMPLETE|
|       7|2013-07-25|       4530|       COMPLETE|
|       8|2013-07-25|       2911|     PROCESSING|
|       9|2013-07-25|       5657|PENDING_PAYMENT|
|      10|2013-07-25|       5648|PENDING_PAYMENT|
|      11|2013-07-25|        918| PAYMENT_REVIEW|
|      12|2013-07-25|       1837|         CLOSED|
+--------+----------+-----------+---------------+



In [9]:
final_df.select("order_id").show()

+--------+
|order_id|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      12|
+--------+



In [10]:
final_df.groupby("status").count().show()

+---------------+-----+
|         status|count|
+---------------+-----+
|PENDING_PAYMENT|    3|
|       COMPLETE|    4|
| PAYMENT_REVIEW|    1|
|     PROCESSING|    1|
|         CLOSED|    3|
+---------------+-----+



In [11]:
#Column String
#Column object

In [13]:
orderDf = spark.read.format("csv")\
         .option("header",True)\
         .option("inferSchema",True)\
         .option("path","orders-201019-002101.csv")\
         .load()
orderDf.select("order_id","order_date").show()
orderDf.select(col("order_id")).show()


+--------+-------------------+
|order_id|         order_date|
+--------+-------------------+
|       1|2013-07-25 00:00:00|
|       2|2013-07-25 00:00:00|
|       3|2013-07-25 00:00:00|
|       4|2013-07-25 00:00:00|
|       5|2013-07-25 00:00:00|
|       6|2013-07-25 00:00:00|
|       7|2013-07-25 00:00:00|
|       8|2013-07-25 00:00:00|
|       9|2013-07-25 00:00:00|
|      10|2013-07-25 00:00:00|
|      11|2013-07-25 00:00:00|
|      12|2013-07-25 00:00:00|
|      13|2013-07-25 00:00:00|
|      14|2013-07-25 00:00:00|
|      15|2013-07-25 00:00:00|
|      16|2013-07-25 00:00:00|
|      17|2013-07-25 00:00:00|
|      18|2013-07-25 00:00:00|
|      19|2013-07-25 00:00:00|
|      20|2013-07-25 00:00:00|
+--------+-------------------+
only showing top 20 rows

+--------+
|order_id|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      12|
|      13|
|      14|
|      15|
|      16|
|      17|
|      18|

In [14]:
#Creating our own user defined function is spark.
#1. Column object expression -- the function won't be registered in catalog
#2. SQL expression -- the function will be registered in catalog.So in this case we can even use it with spark SQL.

In [15]:
#if the age is greater than 18 we have to populate the 4th column named 
#Adult with "Y"else we need to populated the column with "N"

In [16]:
df = spark.read.format("csv")\
    .option("inferSchema",True)\
    .option("path","-201025-223502.dataset1")\
    .load()
     

In [17]:
df.show()

+-------+---+---------+
|    _c0|_c1|      _c2|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [20]:
df1 = df.toDF("name","age","city")
def ageCheck(age):
    if(age > 18):
        return "Y"
    else:
        return "N"
parseAgeFunction = udf(ageCheck,StringType())
df2 = df1.withColumn("adult",parseAgeFunction("age"))
df2.printSchema()
df2.show()
        

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- adult: string (nullable = true)



[Stage 12:>                                                         (0 + 1) / 1]

+-------+---+---------+-----+
|   name|age|     city|adult|
+-------+---+---------+-----+
|  sumit| 30|bangalore|    Y|
|  kapil| 32|hyderabad|    Y|
|sathish| 16|  chennai|    N|
|   ravi| 39|bangalore|    Y|
| kavita| 12|hyderabad|    N|
|  kavya| 19|   mysore|    Y|
+-------+---+---------+-----+



                                                                                

In [22]:
spark.udf.register("parseAgeFunction",ageCheck,StringType())
for x in spark.catalog.listFunctions():
    print(x)
    

22/10/29 13:06:54 WARN SimpleFunctionRegistry: The function parseagefunction replaced a previously registered function.
Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True)
Function(name='%', description=None, className='org.apache.spark.sql.catalyst.expressions.Remainder', isTemporary=True)
Function(name='&', description=None, className='org.apache.spark.sql.catalyst.expressions.BitwiseAnd', isTemporary=True)
Function(name='*', description=None, className='org.apache.spark.sql.catalyst.expressions.Multiply', isTemporary=True)
Function(name='+', description=None, className='org.apache.spark.sql.catalyst.expressions.Add', isTemporary=True)
Function(name='-', description=None, className='org.apache.spark.sql.catalyst.expressions.Subtract', isTemporary=True)
Function(name='/', description=None, className='org.apache.spark.sql.catalyst.expressions.Divide', isTemporary=True)
Function(name='<', description=None, className='org.apac

In [25]:
df2 = df1.withColumn("adult",expr("parseAgeFunction(age)"))
df2.show()

+-------+---+---------+-----+
|   name|age|     city|adult|
+-------+---+---------+-----+
|  sumit| 30|bangalore|    Y|
|  kapil| 32|hyderabad|    Y|
|sathish| 16|  chennai|    N|
|   ravi| 39|bangalore|    Y|
| kavita| 12|hyderabad|    N|
|  kavya| 19|   mysore|    Y|
+-------+---+---------+-----+



In [26]:
#create the spark session
#create a local list
#create a dataframe from this local list and give column names
#add a new column date1 with unix timestamp
#add one more column with monotonically increasing id

#drop the duplicates based on combination of 2 columns
#drop the orderid column
#sort based on order date

In [5]:
myList = [(1,"2013-07-25",11599,"CLOSED"),
          (2,"2014-07-25",256,"PENDING_PAYMENT"),
          (3,"2013-07-25",11599,"COMPLETE"),
          (4,"2019-07-25",8827,"CLOSED")]


In [6]:
ordersDf = spark.createDataFrame(myList)\
                .toDF("orderid","orderdate","customerid","status")


In [7]:
newDf = ordersDf\
       .withColumn("date1",unix_timestamp(col("orderdate"))) \
       .withColumn("newid", monotonically_increasing_id()) \
       .dropDuplicates(["orderdate","customerid"])\
       .drop("orderid")\
       .sort("orderdate")

In [8]:
ordersDf.printSchema()
ordersDf.show()
newDf.show()


root
 |-- orderid: long (nullable = true)
 |-- orderdate: string (nullable = true)
 |-- customerid: long (nullable = true)
 |-- status: string (nullable = true)



                                                                                

+-------+----------+----------+---------------+
|orderid| orderdate|customerid|         status|
+-------+----------+----------+---------------+
|      1|2013-07-25|     11599|         CLOSED|
|      2|2014-07-25|       256|PENDING_PAYMENT|
|      3|2013-07-25|     11599|       COMPLETE|
|      4|2019-07-25|      8827|         CLOSED|
+-------+----------+----------+---------------+

+----------+----------+---------------+-----+-----+
| orderdate|customerid|         status|date1|newid|
+----------+----------+---------------+-----+-----+
|2013-07-25|     11599|         CLOSED| null|    0|
|2014-07-25|       256|PENDING_PAYMENT| null|    1|
|2019-07-25|      8827|         CLOSED| null|    3|
+----------+----------+---------------+-----+-----+



# Aggregate transformations

In [11]:
#1. Simple aggregations
#2. Grouping aggregations
#3. window aggregates

# simple aggregates

In [13]:
invoiceDF = spark.read.format("csv")\
           .option("header",True)\
           .option("inferSchema",True)\
           .option("path","order_data-201025-223502.csv")\
           .load()
#column object expression
    

                                                                                

In [16]:
invoiceDF.select(count("*").alias("RowCount"),
                 sum("Quantity").alias("TotalQuantity"),
                 avg("UnitPrice").alias("AvgPrice"),
                 countDistinct("InvoiceNo").alias("CountDistinct"))\
                 .show()

+--------+-------------+-----------------+-------------+
|RowCount|TotalQuantity|         AvgPrice|CountDistinct|
+--------+-------------+-----------------+-------------+
|  541782|      5175855|4.611565323321897|        25858|
+--------+-------------+-----------------+-------------+



[Stage 9:>                                                          (0 + 1) / 1]                                                                                

In [18]:
#column string expression
invoiceDF.selectExpr("count(*) as RowCount","sum(Quantity) as TotalQuantity",
                     "avg(UnitPrice) as AvgPrice",
                     "count(Distinct(InvoiceNo)) as CountDistinct")\
                     .show()

+--------+-------------+-----------------+-------------+
|RowCount|TotalQuantity|         AvgPrice|CountDistinct|
+--------+-------------+-----------------+-------------+
|  541782|      5175855|4.611565323321897|        25858|
+--------+-------------+-----------------+-------------+



In [21]:
#spark SQL
invoiceDF.createOrReplaceTempView("sales")
spark.sql("select count(*),sum(Quantity),avg(UnitPrice),count(distinct(InvoiceNo)) from sales").show()


+--------+-------------+-----------------+-------------------------+
|count(1)|sum(Quantity)|   avg(UnitPrice)|count(DISTINCT InvoiceNo)|
+--------+-------------+-----------------+-------------------------+
|  541782|      5175855|4.611565323321897|                    25858|
+--------+-------------+-----------------+-------------------------+



# Grouping aggregations

In [24]:
#column object expression
summaryDF = invoiceDF.groupBy("Country","InvoiceNo")\
           .agg(sum("Quantity").alias("TotalQuantity"),sum(expr("Quantity * UnitPrice")).alias("InvoiceValue"))
summaryDF.show()

+--------------+---------+-------------+------------------+
|       Country|InvoiceNo|TotalQuantity|      InvoiceValue|
+--------------+---------+-------------+------------------+
|United Kingdom|   536446|          329|            440.89|
|United Kingdom|   536508|          216|            155.52|
|United Kingdom|   537811|           74|            268.86|
|United Kingdom|   538895|          370|            247.38|
|United Kingdom|   540453|          341|302.44999999999993|
|United Kingdom|   541291|          217|305.81000000000006|
|United Kingdom|   542551|           -1|               0.0|
|United Kingdom|   542576|           -1|               0.0|
|United Kingdom|   542628|            9|            132.35|
|United Kingdom|   542886|          199| 320.5099999999998|
|United Kingdom|   542907|           75|            313.85|
|United Kingdom|   543131|          134|             164.1|
|United Kingdom|   543189|          102|            153.94|
|United Kingdom|   543265|           -4|

In [25]:
#string expression
summaryDf1 = invoiceDF.groupBy("Country","InvoiceNo")\
            .agg(expr("sum(Quantity) as TotalQunatity"),expr("sum(Quantity * UnitPrice) as InvoiceValue"))
summaryDf1.show()

+--------------+---------+-------------+------------------+
|       Country|InvoiceNo|TotalQunatity|      InvoiceValue|
+--------------+---------+-------------+------------------+
|United Kingdom|   536446|          329|            440.89|
|United Kingdom|   536508|          216|            155.52|
|United Kingdom|   537811|           74|            268.86|
|United Kingdom|   538895|          370|            247.38|
|United Kingdom|   540453|          341|302.44999999999993|
|United Kingdom|   541291|          217|305.81000000000006|
|United Kingdom|   542551|           -1|               0.0|
|United Kingdom|   542576|           -1|               0.0|
|United Kingdom|   542628|            9|            132.35|
|United Kingdom|   542886|          199| 320.5099999999998|
|United Kingdom|   542907|           75|            313.85|
|United Kingdom|   543131|          134|             164.1|
|United Kingdom|   543189|          102|            153.94|
|United Kingdom|   543265|           -4|

In [22]:
#spark SQL
invoiceDF.createOrReplaceTempView("sales")
spark.sql("""select  country,InvoiceNo,sum(Quantity) as totQty,sum(Quantity * UnitPrice) asInvoiceValue 
         from sales group by country,InvoiceNo""").show()


+--------------+---------+------+------------------+
|       country|InvoiceNo|totQty|    asInvoiceValue|
+--------------+---------+------+------------------+
|United Kingdom|   536446|   329|            440.89|
|United Kingdom|   536508|   216|            155.52|
|United Kingdom|   537811|    74|            268.86|
|United Kingdom|   538895|   370|            247.38|
|United Kingdom|   540453|   341|302.44999999999993|
|United Kingdom|   541291|   217|305.81000000000006|
|United Kingdom|   542551|    -1|               0.0|
|United Kingdom|   542576|    -1|               0.0|
|United Kingdom|   542628|     9|            132.35|
|United Kingdom|   542886|   199| 320.5099999999998|
|United Kingdom|   542907|    75|            313.85|
|United Kingdom|   543131|   134|             164.1|
|United Kingdom|   543189|   102|            153.94|
|United Kingdom|   543265|    -4|               0.0|
|        Cyprus|   544574|   173|            320.69|
|United Kingdom|   545077|    24|             

[Stage 27:>                                                         (0 + 1) / 1]                                                                                

# 3. window aggregates

In [35]:
invoiceDF = spark.read \
           .format("csv") \
           .option("header", True) \
           .option("inferSchema", True) \
           .option("path", "windowdata-201025-223502.csv") \
           .load()


In [36]:
myWindow = Window.partitionBy("country")\
          .orderBy("weeknum")\
          .rowsBetween(Window.unboundedPreceding, Window.currentRow)


In [37]:
mydf = invoiceDF.withColumn("RunningTotal",sum("invoicevalue").over(myWindow))
mydf.show()

+---------------+-------+-----------+-------------+------------+------------------+
|        country|weeknum|numinvoices|totalquantity|invoicevalue|      RunningTotal|
+---------------+-------+-----------+-------------+------------+------------------+
|      Australia|     48|          1|          107|      358.25|            358.25|
|      Australia|     49|          1|          214|       258.9|            617.15|
|      Australia|     50|          2|          133|      387.95|1005.0999999999999|
|        Austria|     50|          2|            3|      257.04|            257.04|
|        Bahrain|     51|          1|           54|      205.74|            205.74|
|        Belgium|     48|          1|          528|       346.1|             346.1|
|        Belgium|     50|          2|          285|      625.16|            971.26|
|        Belgium|     51|          2|          942|      838.65|1809.9099999999999|
|Channel Islands|     49|          1|           80|      363.53|            

# join on data frame

In [6]:
# Two kind of join
#1-simple join
#2-broadcast join

In [25]:
#1. inner (matching records from both the tables) we wont see the customer who never placed a order.
#2. outer - matching records + non matching records from left table + non matching records from right table 
#3. left - matching records + non matching records from the left table
#4. right - matching records + non matching records fromthe right table


In [7]:
orderDF = spark.read \
           .format("csv") \
           .option("header", True) \
           .option("inferSchema", True) \
           .option("path", "orders-201019-002101.csv") \
           .load()


In [8]:
customerDF = spark.read \
           .format("csv") \
           .option("header", True) \
           .option("inferSchema", True) \
           .option("path", "customers-201025-223502.csv") \
           .load()

In [11]:
#orderDF.show()

In [12]:
#customerDF.show()

In [22]:
joinCondition= orderDF.order_customer_id == customerDF.customer_id
joinType="left"

In [23]:
joinDF= orderDF.join(customerDF,joinCondition,joinType).sort("customer_id")

In [24]:
joinDF.show()

+--------+-------------------+-----------------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|order_id|         order_date|order_customer_id|   order_status|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+--------+-------------------+-----------------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|   68884|               null|             null|       COMPLETE|       null|          null|          null|          null|             null|                null|         null|          null|            null|
|   22945|2013-12-13 00:00:00|                1|       COMPLETE|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsvill

In [21]:
#for i in joinDF:
    #print(i)

In [18]:
k=joinDF.collect()

In [20]:
count=0
for i in k:
    count+=1
print(count)

68883


# Problem after joining

In [26]:
#1. showcasing how your code can lead to ambiguouscolumn names.
#this happens when we try to select a column name which is coming from 2 different dataframes..


In [28]:
# i am going to rename the column name in order.csv as customer_id

In [29]:
orderDF = spark.read \
           .format("csv") \
           .option("header", True) \
           .option("inferSchema", True) \
           .option("path", "orders-201019-002101.csv") \
           .load()


In [30]:
customerDF = spark.read \
           .format("csv") \
           .option("header", True) \
           .option("inferSchema", True) \
           .option("path", "customers-201025-223502.csv") \
           .load()

In [31]:
joinCondition= orderDF.customer_id == customerDF.customer_id
joinType="left"

In [33]:
orderDF.join(customerDF,joinCondition,joinType).show(5)


+--------+-------------------+-----------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|order_id|         order_date|customer_id|   order_status|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+--------+-------------------+-----------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|       1|2013-07-25 00:00:00|      11599|         CLOSED|      11599|          Mary|        Malone|     XXXXXXXXX|        XXXXXXXXX|8708 Indian Horse...|      Hickory|            NC|           28601|
|       2|2013-07-25 00:00:00|        256|PENDING_PAYMENT|        256|         David|     Rodriguez|     XXXXXXXXX|        XXXXXXXXX|7605 Tawny Horse ...|      Chicago|            IL|           60

In [34]:
#here we are getting customer_id twice

orderDF.join(customerDF,joinCondition,joinType).select("customer_id","order_id","order_status").show(5)
# we will get an error because of ambiguous column in two different dataframe and system get confused

AnalysisException: Reference 'customer_id' is ambiguous, could be: customer_id, customer_id.

In [35]:
#how to solve this issue.. ========================== 
#there are 2 ways to solve this problem 
#1. this is before the join
#you rename the ambiguous column in one of the dataframe .withColumnRenamed("old_column_name","new_column_name")


In [37]:
orderDF1=orderDF.withColumnRenamed("customer_id","cust_id")

In [38]:
joinCondition= orderDF1.cust_id == customerDF.customer_id
joinType="left"

In [40]:
orderDF1.join(customerDF,joinCondition,joinType).show(2)


+--------+-------------------+-------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|order_id|         order_date|cust_id|   order_status|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+--------+-------------------+-------+---------------+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|       1|2013-07-25 00:00:00|  11599|         CLOSED|      11599|          Mary|        Malone|     XXXXXXXXX|        XXXXXXXXX|8708 Indian Horse...|      Hickory|            NC|           28601|
|       2|2013-07-25 00:00:00|    256|PENDING_PAYMENT|        256|         David|     Rodriguez|     XXXXXXXXX|        XXXXXXXXX|7605 Tawny Horse ...|      Chicago|            IL|           60625|
+--------+-----

In [41]:
orderDF1.join(customerDF,joinCondition,joinType).select("customer_id","order_id","order_status").show(2)
#NOW WE WILL GET THE VALUE

+-----------+--------+---------------+
|customer_id|order_id|   order_status|
+-----------+--------+---------------+
|      11599|       1|         CLOSED|
|        256|       2|PENDING_PAYMENT|
+-----------+--------+---------------+
only showing top 2 rows



In [42]:
#2. once the join is done we can drop one of those columns. 
#.drop

In [44]:
joinCondition= orderDF.customer_id == customerDF.customer_id
joinType="left"

In [50]:
a=orderDF.join(customerDF,joinCondition,joinType).drop(orderDF.customer_id)


In [51]:
a.select("customer_id","order_id","order_status").show(5)


+-----------+--------+---------------+
|customer_id|order_id|   order_status|
+-----------+--------+---------------+
|      11599|       1|         CLOSED|
|        256|       2|PENDING_PAYMENT|
|      12111|       3|       COMPLETE|
|       8827|       4|         CLOSED|
|      11318|       5|       COMPLETE|
+-----------+--------+---------------+
only showing top 5 rows

