In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("sampleApp").getOrCreate()

In [None]:
agg(*exprs)
join(other, on=None, how=None)
crossJoin(other)
sort(*cols, **kwargs)
sortWithinPartitions(*cols, **kwargs)
groupBy(*cols)/groupby(*cols)
orderBy(*cols, **kwargs)
union(other)
unionByName(other)
hint(name, *parameters)
coalesce(numPartitions)
corr(col1, col2, method=None)
randomSplit(weights, seed=None)
repartition(numPartitions, *cols)
repartitionByRange(numPartitions, *cols)

In [2]:
# create df
df = spark.read.csv('/dnbusr1/sambasivaraot/PySpark/input_data/cruise_ship_info.csv',header=True,inferSchema=True)

In [3]:
df.printSchema()

root
 |-- Ship_name: string (nullable = true)
 |-- Cruise_line: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tonnage: double (nullable = true)
 |-- passengers: double (nullable = true)
 |-- length: double (nullable = true)
 |-- cabins: double (nullable = true)
 |-- passenger_density: double (nullable = true)
 |-- crew: double (nullable = true)



In [4]:
df.show(4)

+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|  Ship_name|Cruise_line|Age|           Tonnage|passengers|length|cabins|passenger_density|crew|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
|    Journey|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|      Quest|    Azamara|  6|30.276999999999997|      6.94|  5.94|  3.55|            42.64|3.55|
|Celebration|   Carnival| 26|            47.262|     14.86|  7.22|  7.43|             31.8| 6.7|
|   Conquest|   Carnival| 11|             110.0|     29.74|  9.53| 14.88|            36.99|19.1|
+-----------+-----------+---+------------------+----------+------+------+-----------------+----+
only showing top 4 rows



### agg(*exprs)

In [7]:
df.agg({"Age":"max"}).show()

+--------+
|max(Age)|
+--------+
|      48|
+--------+



In [8]:
from pyspark.sql import functions as F

In [10]:
df.agg(F.max(df.Tonnage)).show()

+------------+
|max(Tonnage)|
+------------+
|       220.0|
+------------+



### join(other, on=None, how=None)
###### other - Right side of the join
###### on    - 
###### how   – str, default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

In [None]:
# a string for the join column name
df.join(df2, df.name == df2.name, 'outer').select(df.name,df2.height).show()
df.join(df2, 'name', 'outer').select('name','height').show()

# a list of Columns
cond = [df.name == df3.name, df.age == df3.age]
df.join(df2, cond, 'outer').select(df.name,df2.height).show()

df.join(df2,'name').select('name','height').show()
df.join(df2,['name','age']).select(df.name, df.age).collect()

In [None]:
# sample data
valuesA = [(1, 'bob', 3462543658686),(2, 'rob', 9087567565439),(3, 'tim', 5436586999467),(4, 'tom', 8349756853250)]
valuesB = [(1, 'ketchup', 'bob', 1.20),(2, 'rutabaga', 'bob', 3.35),(3, 'fake vegan meat', 'rob', 13.99),(4, 'cheesey poofs', 'tim', 3.99),(5, 'ice cream', 'tim', 4.95),(6, 'protein powder', 'tom', 49.95)]

# create DF
customerDF = spark.createDataFrame(valuesA,['id','name','credit_card_number'])
ordersDF = spark.createDataFrame(valuesB,['id', 'product_name', 'customer', 'price'])

# inner_join(default)
joinedDF = customerDF.join( ordersDF, customerDF.name == ordersDF.customer )
joinedDF = customerDF.join( ordersDF, customerDF.name == ordersDF.customer, 'inner')
joinedDF.show()

# right_join, 
right_joinDF = customerDF.join( ordersDF, customerDF.name == ordersDF.customer, 'right')

# left_join
left_joinDF = customerDF.join( ordersDF, customerDF.name == ordersDF.customer, 'left')

# cross_join
joinedDF = customerDF.crossJoin(ordersDF)

## Aggregating_Data
Two functions here: agg() and groupBy(). 
These are typically used in tandem, but agg() can be used on a dataset without groupBy()

#df.agg({"<df column>":"function"}).show()

df.agg({"Sales":"min"}).show()

from pyspark.sql import function as F

df.groupBy('borough').agg(F.count('borough').alias('count')).show()

df.groupBy('borough').agg(F.sum('number_of_persons_injured').alias('injuries')).orderBy('injuries', ascending=False).show()

## Grouping_by_multiple_columns
aggDF = df.groupBy('borough','contributing_factor_vehicle_1').agg(F.sum('number_of_persons_injured').alias('injuries')).orderBy('injuries', ascending=False).show()

Note: So far we've aggregated by using the count and sum functions. As you might imagine, we could also aggregate by using the min, max, and avg functions. 

## Determining Column Correlation
corr() determines whether two columns have any correlation between them, and outputs and integer which represent the correlation:
df.agg(corr("a", "b").alias('correlation')).collect()

## Import CSV File into Spark Dataframe
import pyspark as spark
 
sc = spark.SQLContext(spark.SparkContext()) 
sdf1 = sc.read.csv("Documents/nycflights13.csv", header = True, inferSchema = True)

# Data Aggregation with Spark Dataframe
import pyspark.sql.functions as fn
 
sdf1.cache().filter("month in (1, 3, 5)").groupby("month").agg(fn.mean("dep_time").alias("avg_dep"), fn.mean("arr_time").alias("avg_arr")).show()

# Data Aggregation with Spark SQL
sc.registerDataFrameAsTable(sdf1, "tbl1")
 
sc.sql("select month, avg(dep_time) as avg_dep, avg(arr_time) as avg_arr from tbl1 where month in (1, 3, 5) group by month").show()
 
sc.dropTempTable(sc.tableNames()[0])

### crossJoin(other)
#### Returns the cartesian product with another DataFrame.

In [None]:
df.select("age", "name").collect()
df2.select("name", "height").collect()

df.crossJoin(df2.select("height")).select("age", "name", "height").collect()

### sort(*cols, **kwargs)

In [None]:
# Returns a new DataFrame sorted by the specified column(s).
df.sort(df.age.desc()).collect()
df.sort("age", ascending=False).collect()

df.orderBy(df.age.desc()).collect()

from pyspark.sql.functions import *
df.sort(asc("age")).collect()
df.orderBy(desc("age"), "name").collect()

df.orderBy(["age", "name"], ascending=[0, 1]).collect()

### sortWithinPartitions(*cols, **kwargs)

In [None]:
# Returns a new DataFrame with each partition sorted by the specified column(s).
df.sortWithinPartitions("age", ascending=False).show()