# Spark on Colab

⚡️ Apache Spark is a powerful distributed computing framework designed for processing large-scale data efficiently. It offers a wide range of functionalities for data manipulation and machine learning. As a Data Scientist, learning Spark can help you handle massive datasets and perform complex analyses with ease.

☁️ On the other hand, Google Colab provides a convenient and free platform for running Python code, particularly suited for those not interested in local installation or powerful hardware (i.e. beginners). It's a hosted Jupyter Notebook service, inheriting all the pros of Jupyter, such as interactivity, visualizations, and documentation capabilities.

🤝 Combining Spark with Google Colab can enhance your data science projects by leveraging Spark's scalability and Colab's user-friendly interface.





## Installing Spark



There are a number of ways to download Spark, the easiest way is to install `pyspark` and then use the `findspark` library which is necessary for loading Spark. Note, `pyspark` is the Python API for Apache Spark.

In [1]:
!pip install pyspark --quiet
!pip install findspark --quiet
!pip install pyngrok --quiet

In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master('local[3]') \
        .appName('ColabSession') \
        .getOrCreate()

Apache Spark provides a suite of web user interfaces (UIs) that you can use to monitor the status and resource consumption of your Spark cluster. Because we're using Colab, our Spark UI resides on Colab's local host. In order to make this available to us we the need the help of `ngrok`. This platform creates a secure tunnel to locally hosted applications using a reverse proxy. You don't need to complete this step but it's free and quite interesting.

In [None]:
from pyngrok import ngrok, conf
import getpass

print('Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken')
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f'Spark UI URL (ngrok tunnel) {public_url}')

## Data Exploration

We'll download a Plotly dataset to play around with.

In [4]:
# Downloading the oil and gas dataset
!wget https://raw.githubusercontent.com/plotly/datasets/master/oil-and-gas.parquet --quiet

In [5]:
# Reading in the dataset
df = spark.read.parquet('oil-and-gas.parquet')

In [6]:
# Taking a look at the dataset
df.show(5)

+--------------+-----------------+-------------------+-----------------+------------------+----------------+-------------------+---------------+-------------------+--------------------+-----------------+
|Reporting Year|Gas Produced, MCF|Water Produced, bbl|Oil Produced, bbl| Surface Longitude|Surface Latitude|          Well Name|      Well Type|Date Well Completed|         Well Status|__index_level_0__|
+--------------+-----------------+-------------------+-----------------+------------------+----------------+-------------------+---------------+-------------------+--------------------+-----------------+
|          2003|              0.0|                0.0|              0.0|         -78.01117|         42.0834|Vossler Katherine 1|    Gas Wildcat|1944-04-25 00:00:00|Plugged and Aband...|                0|
|          2001|              0.0|                0.0|              0.0|         -78.01117|         42.0834|Vossler Katherine 1|    Gas Wildcat|1944-04-25 00:00:00|Plugged and Aband...

In [7]:
# Learning about the dataset
print(f'Number of rows: {df.count()}')
print(f'Number of columns: {len(df.columns)}')
print('')
print('Column name, type:')
for i in range(0, len(df.columns)):
  print(' ', f'{df.columns[i]}', ', ', f'{df.dtypes[i][1]}')

Number of rows: 260374
Number of columns: 11

Column name, type:
  Reporting Year ,  bigint
  Gas Produced, MCF ,  double
  Water Produced, bbl ,  double
  Oil Produced, bbl ,  double
  Surface Longitude ,  double
  Surface Latitude ,  double
  Well Name ,  string
  Well Type ,  string
  Date Well Completed ,  timestamp_ntz
  Well Status ,  string
  __index_level_0__ ,  bigint


In [8]:
# Learning about the columns
print('Column name, no. unique levels:')
for i in df.columns:
  print(' ', i, ', ', df.select(i).distinct().count())

Column name, no. unique levels:
  Reporting Year ,  31
  Gas Produced, MCF ,  16828
  Water Produced, bbl ,  1730
  Oil Produced, bbl ,  910
  Surface Longitude ,  13469
  Surface Latitude ,  11930
  Well Name ,  14228
  Well Type ,  17
  Date Well Completed ,  8125
  Well Status ,  15
  __index_level_0__ ,  260374


In [9]:
# Learning about missing data
from pyspark.sql.functions import col, sum

total_rows = df.count()
missing_counts = df.select([col(column).isNull().cast('int').alias(column) for column in df.columns])

print('Column name, no. missing entries:')
for i in df.columns:
  missing_sum = missing_counts.agg(sum(i)).collect()[0][0]
  print(' ', i, ', ', missing_sum)

Column name, no. missing entries:
  Reporting Year ,  0
  Gas Produced, MCF ,  0
  Water Produced, bbl ,  0
  Oil Produced, bbl ,  0
  Surface Longitude ,  0
  Surface Latitude ,  0
  Well Name ,  0
  Well Type ,  0
  Date Well Completed ,  0
  Well Status ,  0
  __index_level_0__ ,  0


In [10]:
# Learning about columns levels
df.groupBy('Well Type').count().orderBy(col('count').desc()).show()

+--------------------+------+
|           Well Type| count|
+--------------------+------+
|     Gas Development|186942|
|     Oil Development| 52142|
|         Gas Wildcat|  7376|
|       Gas Extension|  5737|
|       Oil Injection|  4707|
|         Oil Wildcat|  1288|
|         Dry Wildcat|   879|
|            Dry Hole|   511|
|  Monitoring Storage|   308|
|             Storage|   153|
|          Not Listed|    90|
|       Oil Extension|    82|
|            Disposal|    58|
|       Stratigraphic|    56|
|          Geothermal|    24|
|Monitoring Miscel...|    11|
|               Brine|    10|
+--------------------+------+



In [11]:
# Learning about columns levels
from pyspark.sql.functions import min, max

print(f"Earliest Well completion in dataset: {df.agg(min('Date Well Completed')).collect()[0][0]}")
print(f"Latest Well completion in dataset: {df.agg(max('Date Well Completed')).collect()[0][0]}")

Earliest Well completion in dataset: 1881-08-01 00:00:00
Latest Well completion in dataset: 2016-06-07 00:00:00


## Data Editing

In [12]:
# Filtering the dataset
s_date = '1960-01-01 00:00:00'
e_date = '2011-01-01 00:00:00'
df_filt = df[(df['Date Well Completed'] >= s_date) & (df['Date Well Completed'] < e_date)]

# Check
print(f"Earliest Well completion in dataset: {df_filt.agg(min('Date Well Completed')).collect()[0][0]}")
print(f"Latest Well completion in dataset: {df_filt.agg(max('Date Well Completed')).collect()[0][0]}")

Earliest Well completion in dataset: 1960-01-01 00:00:00
Latest Well completion in dataset: 2010-12-30 00:00:00


In [13]:
# Dropping columns and duplicates
df_drp = df.drop('__index_level_0__')
df_dupdrop = df_drp.dropDuplicates()
print(f'Number of rows after drop: {df_dupdrop.count()}')
print(f'Number of columns after drop: {len(df_dupdrop.columns)}')

Number of rows after drop: 260372
Number of columns after drop: 10


In [14]:
# Extracting month and years from datatime column
from pyspark.sql.functions import month, year

df_date = df.withColumn('Well Completion Month', month(col('Date Well Completed'))) \
            .withColumn('Well Completion Year', year(col('Date Well Completed')))

df_date.select('Date Well Completed', 'Well Completion Month', 'Well Completion Year').show(5)

+-------------------+---------------------+--------------------+
|Date Well Completed|Well Completion Month|Well Completion Year|
+-------------------+---------------------+--------------------+
|1944-04-25 00:00:00|                    4|                1944|
|1944-04-25 00:00:00|                    4|                1944|
|1944-04-25 00:00:00|                    4|                1944|
|1928-09-11 00:00:00|                    9|                1928|
|1928-09-11 00:00:00|                    9|                1928|
+-------------------+---------------------+--------------------+
only showing top 5 rows



In [15]:
# Creating groupby information
df_gb = df.groupBy('Well Type').sum('Gas Produced, MCF', 'Oil Produced, bbl', 'Water Produced, bbl') \
                                .withColumnRenamed('sum(Gas Produced, MCF)', 'Total Gas Produced, MCF') \
                                .withColumnRenamed('sum(Oil Produced, bbl)', 'Total Oil Produced, bbl') \
                                .withColumnRenamed('sum(Water Produced, bbl)', 'Total Water Produced, bbl')

df_gb.show()

+--------------------+-----------------------+-----------------------+-------------------------+
|           Well Type|Total Gas Produced, MCF|Total Oil Produced, bbl|Total Water Produced, bbl|
+--------------------+-----------------------+-----------------------+-------------------------+
|          Not Listed|               161574.0|                   94.0|                    145.0|
|         Oil Wildcat|              1319890.6|     101150.23999999999|                 205198.0|
|       Gas Extension|         2.0435561175E8|                    1.0|                 668694.1|
|               Brine|                    0.0|                    0.0|                      0.0|
|       Stratigraphic|                    0.0|                    0.0|                      0.0|
|  Monitoring Storage|            1.4969839E7|                    0.0|                   4126.0|
|         Dry Wildcat|              2916556.0|                    0.0|                 340052.0|
|            Disposal|        

In [16]:
# Formatting numberical entries
from pyspark.sql.functions import format_number

df_format = df_gb.withColumn('Total Gas Produced, MCF', format_number('Total Gas Produced, MCF', 0)) \
                  .withColumn('Total Oil Produced, bbl', format_number('Total Oil Produced, bbl', 0)) \
                  .withColumn('Total Water Produced, bbl', format_number('Total Water Produced, bbl', 0))

df_format.show()

+--------------------+-----------------------+-----------------------+-------------------------+
|           Well Type|Total Gas Produced, MCF|Total Oil Produced, bbl|Total Water Produced, bbl|
+--------------------+-----------------------+-----------------------+-------------------------+
|          Not Listed|                161,574|                     94|                      145|
|         Oil Wildcat|              1,319,891|                101,150|                  205,198|
|       Gas Extension|            204,355,612|                      1|                  668,694|
|               Brine|                      0|                      0|                        0|
|       Stratigraphic|                      0|                      0|                        0|
|  Monitoring Storage|             14,969,839|                      0|                    4,126|
|         Dry Wildcat|              2,916,556|                      0|                  340,052|
|            Disposal|        

In [17]:
# Creating columns based on other columns
from pyspark.sql.functions import when

df_new = df_format.withColumn(f'Total Gas Produced, MCF (edited)', when(col('Well Type') == 'Oil Extension', 0)
                                                                    .otherwise(col('Total Gas Produced, MCF')))
df_new.show()

+--------------------+-----------------------+-----------------------+-------------------------+--------------------------------+
|           Well Type|Total Gas Produced, MCF|Total Oil Produced, bbl|Total Water Produced, bbl|Total Gas Produced, MCF (edited)|
+--------------------+-----------------------+-----------------------+-------------------------+--------------------------------+
|          Not Listed|                161,574|                     94|                      145|                         161,574|
|         Oil Wildcat|              1,319,891|                101,150|                  205,198|                       1,319,891|
|       Gas Extension|            204,355,612|                      1|                  668,694|                     204,355,612|
|               Brine|                      0|                      0|                        0|                               0|
|       Stratigraphic|                      0|                      0|                    

In [18]:
from pyspark.sql.functions import avg

df.agg(max('Gas Produced, MCF')).collect()[0][0]

6474551.0

In [19]:
# Banding Columns
from pyspark.sql.functions import lit, create_map
from pyspark.ml.feature import Bucketizer
from itertools import chain

boundaries = [float('-inf'), 1000, 10000, 100000, 1000000, float('inf')]
labels = {0.0: 'tiny', 1.0: 'small', 2.0: 'medium', 3.0: 'large', 4.0: 'massive'}

bucketiser = Bucketizer(splits=boundaries, inputCol = 'Gas Produced, MCF', outputCol = 'Binned Gas Produced, MCF')
df_bin = bucketiser.transform(df)

mapping = create_map([lit(x) for x in chain(*labels.items())])
df_lbl = df_bin.withColumn('Banded Gas Produced, MCF', mapping.getItem(col('Binned Gas Produced, MCF')))
df_lbl = df_lbl.drop('Binned Gas Produced, MCF')

df_lbl.groupBy('Banded Gas Produced, MCF').min('Gas Produced, MCF').orderBy('min(Gas Produced, MCF)').show()
df_lbl.groupBy('Banded Gas Produced, MCF').max('Gas Produced, MCF').orderBy('max(Gas Produced, MCF)').show()
df_lbl.groupBy('Banded Gas Produced, MCF').count().show()



+------------------------+----------------------+
|Banded Gas Produced, MCF|min(Gas Produced, MCF)|
+------------------------+----------------------+
|                    tiny|                   0.0|
|                   small|                1000.0|
|                  medium|               10000.0|
|                   large|              100054.0|
|                 massive|             1004548.0|
+------------------------+----------------------+

+------------------------+----------------------+
|Banded Gas Produced, MCF|max(Gas Produced, MCF)|
+------------------------+----------------------+
|                    tiny|                 999.0|
|                   small|                9997.0|
|                  medium|               99816.0|
|                   large|              999670.0|
|                 massive|             6474551.0|
+------------------------+----------------------+

+------------------------+------+
|Banded Gas Produced, MCF| count|
+------------------------+----

In [20]:
# Adding a random column
from pyspark.sql.functions import rand
from pyspark.sql.types import DoubleType

bins = 5
df_rnd = df.withColumn('rand', rand())
df_rnd = df_rnd.withColumn('rand_col', (col('rand')*bins).cast(DoubleType()).cast('int'))
df_rnd = df_rnd.drop('rand')

df_rnd.groupBy('rand_col').count().orderBy('rand_col').show()

+--------+-----+
|rand_col|count|
+--------+-----+
|       0|51781|
|       1|52384|
|       2|52231|
|       3|51918|
|       4|52060|
+--------+-----+



In [21]:
# Joining data
df_join = df.join(df_new.select('Well Type', 'Total Gas Produced, MCF'), on='Well Type', how='left')
df_join.show()

+---------------+--------------+-----------------+-------------------+-----------------+------------------+----------------+-------------------+-------------------+--------------------+-----------------+-----------------------+
|      Well Type|Reporting Year|Gas Produced, MCF|Water Produced, bbl|Oil Produced, bbl| Surface Longitude|Surface Latitude|          Well Name|Date Well Completed|         Well Status|__index_level_0__|Total Gas Produced, MCF|
+---------------+--------------+-----------------+-------------------+-----------------+------------------+----------------+-------------------+-------------------+--------------------+-----------------+-----------------------+
|    Gas Wildcat|          2003|              0.0|                0.0|              0.0|         -78.01117|         42.0834|Vossler Katherine 1|1944-04-25 00:00:00|Plugged and Aband...|                0|            175,263,922|
|    Gas Wildcat|          2001|              0.0|                0.0|              0.0|

## End Spark

In [22]:
spark.stop()