## Source data & install

In [1]:
# Install the dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop3.2.tgz
!tar xf spark-3.0.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
# Set the environment variables for running PySpark in the collaboration environmentimport os
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.1-bin-hadoop3.2"

In [3]:
# Run the local session to test the installation
import findspark
findspark.init('spark-3.0.1-bin-hadoop3.2')
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [4]:
# Create dataframe
projects = spark.createDataFrame(
                              [ (1, '2020-01-01', '2020-01-02'),
                                (2, '2020-01-02', '2020-01-03'),
                                (3, '2020-01-03', '2020-01-04'),
                                (4, '2020-01-04', '2020-01-05'),
                                (5, '2020-01-06', '2020-01-07'),
                                (6, '2020-01-16', '2020-01-17'),
                                (7, '2020-01-17', '2020-01-18'),
                                (8, '2020-01-18', '2020-01-19'),
                                (9, '2020-01-19', '2020-01-20'),
                                (10, '2020-01-21', '2020-01-22'),
                                (11, '2020-01-26', '2020-01-27'),
                                (12, '2020-01-27', '2020-01-28'),
                                (13, '2020-01-28', '2020-01-29'),
                                (14, '2020-01-29', '2020-01-30'),],
                              ['proj_id' , 'proj_start', 'proj_end'])

In [5]:
projects.show()
projects.printSchema()

+-------+----------+----------+
|proj_id|proj_start|  proj_end|
+-------+----------+----------+
|      1|2020-01-01|2020-01-02|
|      2|2020-01-02|2020-01-03|
|      3|2020-01-03|2020-01-04|
|      4|2020-01-04|2020-01-05|
|      5|2020-01-06|2020-01-07|
|      6|2020-01-16|2020-01-17|
|      7|2020-01-17|2020-01-18|
|      8|2020-01-18|2020-01-19|
|      9|2020-01-19|2020-01-20|
|     10|2020-01-21|2020-01-22|
|     11|2020-01-26|2020-01-27|
|     12|2020-01-27|2020-01-28|
|     13|2020-01-28|2020-01-29|
|     14|2020-01-29|2020-01-30|
+-------+----------+----------+

root
 |-- proj_id: long (nullable = true)
 |-- proj_start: string (nullable = true)
 |-- proj_end: string (nullable = true)



In [6]:
# Change type from string to date
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *
udf_date = udf(lambda x:datetime.strptime(x, "%Y-%m-%d"),DateType())
df = projects.withColumn('proj_start',udf_date(col('proj_start'))).withColumn('proj_end',udf_date(col('proj_end')))

In [7]:
df.show()
df.printSchema()

+-------+----------+----------+
|proj_id|proj_start|  proj_end|
+-------+----------+----------+
|      1|2020-01-01|2020-01-02|
|      2|2020-01-02|2020-01-03|
|      3|2020-01-03|2020-01-04|
|      4|2020-01-04|2020-01-05|
|      5|2020-01-06|2020-01-07|
|      6|2020-01-16|2020-01-17|
|      7|2020-01-17|2020-01-18|
|      8|2020-01-18|2020-01-19|
|      9|2020-01-19|2020-01-20|
|     10|2020-01-21|2020-01-22|
|     11|2020-01-26|2020-01-27|
|     12|2020-01-27|2020-01-28|
|     13|2020-01-28|2020-01-29|
|     14|2020-01-29|2020-01-30|
+-------+----------+----------+

root
 |-- proj_id: long (nullable = true)
 |-- proj_start: date (nullable = true)
 |-- proj_end: date (nullable = true)



## Spark SQL

In [8]:
df.createOrReplaceTempView("tbl")

In [9]:
# Query
df_sql = spark.sql("""
                      select 
                            p3.proj_group, 
                            min(p3.proj_start) as date_start,
                            max(p3.proj_end) as date_end,
                            datediff(max(p3.proj_end), min(p3.proj_end))+1 as delta
                      from
                          (select 
                            p2.*,
                            sum(p2.flag)over(order by p2.proj_id) as proj_group
                        from 
                          (select 
                                p.proj_id , 
                                p.proj_start, 
                                p.proj_end, 
                                case 
                                when lag(p.proj_end)over(order by p.proj_id) = p.proj_start then 0 else 1 
                                end as flag
                          from tbl as p) as p2) as p3
                      group by p3.proj_group
                    """)

In [10]:
df_sql.show()
df_sql.printSchema()

+----------+----------+----------+-----+
|proj_group|date_start|  date_end|delta|
+----------+----------+----------+-----+
|         1|2020-01-01|2020-01-05|    4|
|         2|2020-01-06|2020-01-07|    1|
|         3|2020-01-16|2020-01-20|    4|
|         4|2020-01-21|2020-01-22|    1|
|         5|2020-01-26|2020-01-30|    4|
+----------+----------+----------+-----+

root
 |-- proj_group: long (nullable = true)
 |-- date_start: date (nullable = true)
 |-- date_end: date (nullable = true)
 |-- delta: integer (nullable = true)



## Spark DataFrame

In [11]:
from pyspark.sql.functions import lag
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Equivalent of Pandas.dataframe.shift() method
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df.withColumn('lag', F.lag("proj_end").over(w))
df_dataframe.show()

+-------+----------+----------+----------+
|proj_id|proj_start|  proj_end|       lag|
+-------+----------+----------+----------+
|      1|2020-01-01|2020-01-02|      null|
|      2|2020-01-02|2020-01-03|2020-01-02|
|      3|2020-01-03|2020-01-04|2020-01-03|
|      4|2020-01-04|2020-01-05|2020-01-04|
|      5|2020-01-06|2020-01-07|2020-01-05|
|      6|2020-01-16|2020-01-17|2020-01-07|
|      7|2020-01-17|2020-01-18|2020-01-17|
|      8|2020-01-18|2020-01-19|2020-01-18|
|      9|2020-01-19|2020-01-20|2020-01-19|
|     10|2020-01-21|2020-01-22|2020-01-20|
|     11|2020-01-26|2020-01-27|2020-01-22|
|     12|2020-01-27|2020-01-28|2020-01-27|
|     13|2020-01-28|2020-01-29|2020-01-28|
|     14|2020-01-29|2020-01-30|2020-01-29|
+-------+----------+----------+----------+



In [12]:
# Equivalent of SQL- CASE WHEN...THEN...ELSE... END
df_dataframe = df_dataframe.withColumn('flag',F.when(df_dataframe["proj_start"] == df_dataframe["lag"],0).otherwise(1))
df_dataframe.show()

+-------+----------+----------+----------+----+
|proj_id|proj_start|  proj_end|       lag|flag|
+-------+----------+----------+----------+----+
|      1|2020-01-01|2020-01-02|      null|   1|
|      2|2020-01-02|2020-01-03|2020-01-02|   0|
|      3|2020-01-03|2020-01-04|2020-01-03|   0|
|      4|2020-01-04|2020-01-05|2020-01-04|   0|
|      5|2020-01-06|2020-01-07|2020-01-05|   1|
|      6|2020-01-16|2020-01-17|2020-01-07|   1|
|      7|2020-01-17|2020-01-18|2020-01-17|   0|
|      8|2020-01-18|2020-01-19|2020-01-18|   0|
|      9|2020-01-19|2020-01-20|2020-01-19|   0|
|     10|2020-01-21|2020-01-22|2020-01-20|   1|
|     11|2020-01-26|2020-01-27|2020-01-22|   1|
|     12|2020-01-27|2020-01-28|2020-01-27|   0|
|     13|2020-01-28|2020-01-29|2020-01-28|   0|
|     14|2020-01-29|2020-01-30|2020-01-29|   0|
+-------+----------+----------+----------+----+



In [13]:
# Cumsum by column flag
w = Window().partitionBy().orderBy(col("proj_id"))
df_dataframe = df_dataframe.withColumn("proj_group", F.sum("flag").over(w))
df_dataframe.show()

+-------+----------+----------+----------+----+----------+
|proj_id|proj_start|  proj_end|       lag|flag|proj_group|
+-------+----------+----------+----------+----+----------+
|      1|2020-01-01|2020-01-02|      null|   1|         1|
|      2|2020-01-02|2020-01-03|2020-01-02|   0|         1|
|      3|2020-01-03|2020-01-04|2020-01-03|   0|         1|
|      4|2020-01-04|2020-01-05|2020-01-04|   0|         1|
|      5|2020-01-06|2020-01-07|2020-01-05|   1|         2|
|      6|2020-01-16|2020-01-17|2020-01-07|   1|         3|
|      7|2020-01-17|2020-01-18|2020-01-17|   0|         3|
|      8|2020-01-18|2020-01-19|2020-01-18|   0|         3|
|      9|2020-01-19|2020-01-20|2020-01-19|   0|         3|
|     10|2020-01-21|2020-01-22|2020-01-20|   1|         4|
|     11|2020-01-26|2020-01-27|2020-01-22|   1|         5|
|     12|2020-01-27|2020-01-28|2020-01-27|   0|         5|
|     13|2020-01-28|2020-01-29|2020-01-28|   0|         5|
|     14|2020-01-29|2020-01-30|2020-01-29|   0|         

In [14]:
# Equivalent of SQL - GROUP BY
from pyspark.sql.functions import  min, max
df_group = df_dataframe.groupBy("proj_group").agg(min("proj_start").alias("date_start"), \
                                                  max("proj_end").alias("date_end"))
df_group = df_group.withColumn("delta", F.datediff(df_group.date_end,df_group.date_start))
df_group.show()
df_group.printSchema()

+----------+----------+----------+-----+
|proj_group|date_start|  date_end|delta|
+----------+----------+----------+-----+
|         1|2020-01-01|2020-01-05|    4|
|         2|2020-01-06|2020-01-07|    1|
|         3|2020-01-16|2020-01-20|    4|
|         4|2020-01-21|2020-01-22|    1|
|         5|2020-01-26|2020-01-30|    4|
+----------+----------+----------+-----+

root
 |-- proj_group: long (nullable = true)
 |-- date_start: date (nullable = true)
 |-- date_end: date (nullable = true)
 |-- delta: integer (nullable = true)

