# Spark on Colab

⚡️ Apache Spark is a powerful distributed computing framework designed for processing large-scale data efficiently. It offers a wide range of functionalities for data manipulation and machine learning. As a Data Scientist, learning Spark can help you handle massive datasets and perform complex analyses with ease.

☁️ On the other hand, Google Colab provides a convenient and free platform for running Python code, particularly suited for those not interested in local installation or powerful hardware (i.e. beginners). It's a hosted Jupyter Notebook service, inheriting all the pros of Jupyter, such as interactivity, visualizations, and documentation capabilities.

🤝 Combining Spark with Google Colab can enhance your data science projects by leveraging Spark's scalability and Colab's user-friendly interface.





## Installing Spark



There are a number of ways to download Spark, the easiest way is to install `pyspark` and then use the `findspark` library which is necessary for loading Spark. Note, `pyspark` is the Python API for Apache Spark.

In [1]:
!pip install pyspark --quiet
!pip install findspark --quiet
!pip install pyngrok --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName('ColabTest') \
        .getOrCreate()

Apache Spark provides a suite of web user interfaces (UIs) that you can use to monitor the status and resource consumption of your Spark cluster. Because we're using Colab, our Spark UI resides on Colab's local host. In order to make this available to us we the need the help of `ngrok`. This platform creates a secure tunnel to locally hosted applications using a reverse proxy. You don't need to complete this step but it's free and quite interesting.

In [None]:
from pyngrok import ngrok, conf
import getpass

print('Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken')
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f'ngrok tunnel {public_url}')

## Data Exploration

We'll download a Plotly dataset to play around with.

In [7]:
# Downloading the oil and gas dataset
!wget https://raw.githubusercontent.com/plotly/datasets/master/oil-and-gas.parquet --quiet

In [10]:
# Reading in the dataset
df = spark.read.parquet('oil-and-gas.parquet')

In [28]:
# Learning about the dataset
print(f'Number of rows: {df.count()}')
print(f'Number of columns: {len(df.columns)}')
print('')
print('Column name, type:')
for i in range(0, len(df.columns)):
  print(' ', f'{df.columns[i]}', ', ', f'{df.dtypes[i][1]}')

Number of rows: 260374
Number of columns: 11

Column names, type:
  Reporting Year ,  bigint
  Gas Produced, MCF ,  double
  Water Produced, bbl ,  double
  Oil Produced, bbl ,  double
  Surface Longitude ,  double
  Surface Latitude ,  double
  Well Name ,  string
  Well Type ,  string
  Date Well Completed ,  timestamp_ntz
  Well Status ,  string
  __index_level_0__ ,  bigint


In [32]:
# Learning about the columns
print('Column name, no. unique levels:')
for i in df.columns:
  print(' ', i, ', ', df.select(i).distinct().count())

Column name, no. unique levels
  Reporting Year ,  31
  Gas Produced, MCF ,  16828
  Water Produced, bbl ,  1730
  Oil Produced, bbl ,  910
  Surface Longitude ,  13469
  Surface Latitude ,  11930
  Well Name ,  14228
  Well Type ,  17
  Date Well Completed ,  8125
  Well Status ,  15
  __index_level_0__ ,  260374


In [43]:
# Learning about missing data
from pyspark.sql.functions import col, sum

total_rows = df.count()
missing_counts = df.select([col(column).isNull().cast('int').alias(column) for column in df.columns])

print('Column name, no. missing entries:')
for i in df.columns:
  missing_sum = missing_counts.agg(sum(i)).collect()[0][0]
  print(' ', i, ', ', missing_sum)

Column name, no. missing entries:
  Reporting Year ,  0
  Gas Produced, MCF ,  0
  Water Produced, bbl ,  0
  Oil Produced, bbl ,  0
  Surface Longitude ,  0
  Surface Latitude ,  0
  Well Name ,  0
  Well Type ,  0
  Date Well Completed ,  0
  Well Status ,  0
  __index_level_0__ ,  0


In [46]:
# Learning about columns levels
df.groupBy('Well Type').count().orderBy(col('count').desc()).show()

+--------------------+------+
|           Well Type| count|
+--------------------+------+
|     Gas Development|186942|
|     Oil Development| 52142|
|         Gas Wildcat|  7376|
|       Gas Extension|  5737|
|       Oil Injection|  4707|
|         Oil Wildcat|  1288|
|         Dry Wildcat|   879|
|            Dry Hole|   511|
|  Monitoring Storage|   308|
|             Storage|   153|
|          Not Listed|    90|
|       Oil Extension|    82|
|            Disposal|    58|
|       Stratigraphic|    56|
|          Geothermal|    24|
|Monitoring Miscel...|    11|
|               Brine|    10|
+--------------------+------+



In [50]:
# Learning about columns levels
from pyspark.sql.functions import min, max

print(f"Earliest Well completion in dataset: {df.agg(min('Date Well Completed')).collect()[0][0]}")
print(f"Latest Well completion in dataset: {df.agg(max('Date Well Completed')).collect()[0][0]}")

Earliest Well completion in dataset: 1881-08-01 00:00:00
Latest Well completion in dataset: 2016-06-07 00:00:00
