In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('MiniProject').getOrCreate()
spark

In [0]:
storage_account="mystoacckad"
application_id="14a14259-70ba-4c26-a136-262468ab64da"
directory_id="26af9d76-35fe-404a-b312-869c37aec9c7"
container_name="fileshare"

service_credential = dbutils.secrets.get(scope="Secrete-scope-databricks1", key="app-reg-secrets1")

spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
               f"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", application_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", service_credential)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
               f"https://login.microsoftonline.com/{directory_id}/oauth2/token")

df = spark.read. \
    format("csv"). \
    option("header", True). \
    load(f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/databricks-project/customers.csv")

In [0]:
df.show(5)

In [0]:
df.printSchema()

In [0]:
from pyspark.sql.functions import *

df=df.withColumn('registration_date', to_date(col('registration_date'),'yyyy-MM-dd')) \
    .withColumn('is_active', col('is_active').cast('boolean'))


In [0]:
df.show(5)
df.printSchema()

In [0]:
df.fillna({'city':'Unknown','state':'Unknown','country':'Unknown'})

In [0]:
df=df.withColumn('registration_year',year(col('registration_date')))

In [0]:
df.show()
df.printSchema()

In [0]:
df=df.withColumn('registration_month', month(col('registration_date')))
df.show(5)
df.printSchema()

In [0]:
df_select=df.select('city','country').where("city = 'Unknown'")
df_select.show(5)
display(df_select)

In [0]:
unique_cities=df.select(countDistinct('city')).collect()
unique_cities=unique_cities[0][0]
print(unique_cities)

#one line code---->
unique_cities = df.select(countDistinct('city')).first()[0]
print(unique_cities)

#using alias
unique_cities = (
    df.select(countDistinct('city').alias('cnt'))
      .first()['cnt']
)
print(unique_cities)


In [0]:
display(df.select(countDistinct('city')))
df.select(countDistinct('city')).show()

In [0]:
    display(df.groupBy('city', 'country')
      .count()
      .withColumnRenamed('count', 'cnt')
      .orderBy(col('cnt').desc()))

In [0]:
#pivot table- count of active inactive users per state
df.groupBy('state').pivot('is_active').count().show()

In [0]:
from pyspark.sql.window import Window
window_spec=Window.partitionBy('state').orderBy(col('registration_date').desc())
df=df.withColumn('rank',rank().over(window_spec)) \
    .withColumn('dense_rank', dense_rank().over(window_spec)) \
        .withColumn('row_number',row_number().over(window_spec))

df.show()

In [0]:
df_recent_cust=df.filter(col("registration_date")>=lit('2023-09-01')).orderBy(col('registration_date'))
df_recent_cust.show()
df_recent_cust.count()

In [0]:
#oldest and the newest customer per city

df.groupBy('city').agg(min('registration_date').alias('oldest'),max('registration_date').alias('newest')).show

In [0]:
output_path='/Workspace/Users/kadambarinaik07@gmail.com/ProjectNotebooks/processed_customers'

df.write.mode('overwrite').parquet(output_path)

In [0]:
orders_df=spark.read \
    .format('csv') \
    .option('header',True) \
    .option('inferSchema', True) \
    .load(f"abfss://{container_name}@{storage_account}.dfs.core.windows.net/databricks-project/orders.csv")

In [0]:
customers_order_df=df.join(orders_df, df.customer_id==orders_df.customer_id,"inner") #causes customer_id duplication

In [0]:
customers_order_df=df.join(orders_df,["customer_id"], "inner")
customers_order_df.printSchema()

In [0]:
customers_order_df.show(5)

In [0]:
#from pyspark.sql.functions import col; 
customers_orders_count=customers_order_df.groupBy('customer_id').count().orderBy(col('count').desc())
customers_orders_count.show(5)

In [0]:
#total spend per customer
customer_total_spend=customers_order_df.groupBy('customer_id').agg(sum('total_amount').alias('total_amt')).orderBy(col('total_amt').desc())
customer_total_spend.show(5)

In [0]:
#order by status
orders_status_df=customers_order_df.groupBy('status').count().withColumnRenamed('count','cnt_orders').orderBy(col('cnt_orders').desc())
orders_status_df.show()

In [0]:
#order by month
order_by_month=customers_order_df.withColumn('order_month', month(col('order_date'))) \
    .groupBy('order_month') \
        .count() \
            .withColumnRenamed('count','order_mon_cnt') \
                .orderBy(col('order_mon_cnt').desc())

order_by_month.show(5)

In [0]:
customer_total_spend.show(10)

In [0]:
window_spec=Window.orderBy(col('total_amt').desc())

ranked_customers=customer_total_spend.withColumn('dense_rank',dense_rank().over(window_spec)) \
    .withColumn('rank',rank().over(window_spec)) \
        .withColumn('rownumber', row_number().over(window_spec))

ranked_customers.show(5)

In [0]:
#finding customers with high order frequency but low total spend
#customers_orders_count,customer_total_spend
customer_spend_vs_order=customers_orders_count.join(customer_total_spend, 'customer_id','inner') \
    .orderBy(col('count').desc(),col('total_amt').asc())
customer_spend_vs_order.show()