In [1]:
!pip install pyspark==3.3.1 py4j==0.10.9.5

Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845493 sha256=faffd183ce37879a409a2f15160647d4ae2e97a75ac2057cd59895ba63bd992f
  Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninst

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf

conf = SparkConf()
conf.set("spark.app.name", "PySpark DataFrame #3")
conf.set("spark.master", "local[*]")

spark = SparkSession.builder\
        .config(conf=conf)\
        .getOrCreate()

In [3]:
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt

--2024-06-18 07:10:58--  https://s3-geospatial.s3.us-west-2.amazonaws.com/transfer_cost.txt
Resolving s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)... 52.218.181.33, 52.92.197.114, 52.92.233.226, ...
Connecting to s3-geospatial.s3.us-west-2.amazonaws.com (s3-geospatial.s3.us-west-2.amazonaws.com)|52.218.181.33|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 286779 (280K) [text/plain]
Saving to: ‘transfer_cost.txt’


2024-06-18 07:10:59 (1.65 MB/s) - ‘transfer_cost.txt’ saved [286779/286779]



In [4]:
!ls -tl

total 288
drwxr-xr-x 1 root root   4096 May  2 13:25 sample_data
-rw-r--r-- 1 root root 286779 Apr 24  2022 transfer_cost.txt


In [5]:
!head -5 transfer_cost.txt

On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling
On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today
On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today


In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

schema = StructType([ StructField("text", StringType(), True)])
transfer_cost_df = spark.read.schema(schema).text("transfer_cost.txt")

In [7]:
transfer_cost_df.show(truncate=False)

+---------------------------------------------------------------------------+
|text                                                                       |
+---------------------------------------------------------------------------+
|On 2021-01-04 the cost per ton from 85001 to 85002 is $28.32 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85004 is $25.68 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85007 is 19.86 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85001 to 85007 is 20.52 at Haul Today  |
|On 2021-01-04 the cost per ton from 85001 to 85010 is 20.72 at Haul Today  |
|On 2021-01-04 the cost per ton from 85001 to 85012 is $18.98 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85001 to 85013 is 26.64 at Haul Today  |
|On 2021-01-04 the cost per ton from 85001 to 85020 is 26.34 at ABC Hauling |
|On 2021-01-04 the cost per ton from 85001 to 85021 is $20.15 at ABC Hauling|
|On 2021-01-04 the cost per ton from 85002 to 85001 is 21.57 at 

In [8]:
from pyspark.sql.functions import *
regex_str = r'On (\S+) the cost per ton from (\d+) to (\d+) is (\S+) at (.*)'

df_with_new_columns = transfer_cost_df\
    .withColumn('week', regexp_extract('text', regex_str, 1))\
    .withColumn('departure_zipcode', regexp_extract(column('text'), regex_str, 2))\
    .withColumn('arrival_zipcode', regexp_extract(transfer_cost_df.text, regex_str, 3))\
    .withColumn('cost', regexp_extract(col('text'), regex_str, 4))\
    .withColumn('vendor', regexp_extract(col('text'), regex_str, 5))

In [9]:
df_with_new_columns.printSchema()

root
 |-- text: string (nullable = true)
 |-- week: string (nullable = true)
 |-- departure_zipcode: string (nullable = true)
 |-- arrival_zipcode: string (nullable = true)
 |-- cost: string (nullable = true)
 |-- vendor: string (nullable = true)



In [10]:
final_df = df_with_new_columns.drop("text")

In [11]:
final_df.write.csv("extracted.csv")

In [12]:
!ls -tl

total 292
drwxr-xr-x 2 root root   4096 Jun 18 07:16 extracted.csv
drwxr-xr-x 1 root root   4096 May  2 13:25 sample_data
-rw-r--r-- 1 root root 286779 Apr 24  2022 transfer_cost.txt


In [19]:
!ls -tl extracted.csv/

total 156
-rw-r--r-- 1 root root      0 Jun 18 07:16 _SUCCESS
-rw-r--r-- 1 root root 156423 Jun 18 07:16 part-00000-0a34e7ae-1cfa-4807-b0e5-d5a92c076e47-c000.csv


In [24]:
!head -5 extracted.csv/part-00000-0a34e7ae-1cfa-4807-b0e5-d5a92c076e47-c000.csv

2021-01-04,85001,85002,$28.32,ABC Hauling
2021-01-04,85001,85004,$25.68,ABC Hauling
2021-01-04,85001,85007,19.86,ABC Hauling
2021-01-04,85001,85007,20.52,Haul Today
2021-01-04,85001,85010,20.72,Haul Today


In [25]:
final_df.write.format("json").save("extracted.json")

AnalysisException: path file:/content/extracted.json already exists.

In [22]:
!ls -tl extracted.json/

total 428
-rw-r--r-- 1 root root      0 Jun 18 07:18 _SUCCESS
-rw-r--r-- 1 root root 436305 Jun 18 07:18 part-00000-ccb4b4ee-c83a-4a9b-94ce-9dd0ac462620-c000.json


In [23]:
!head -1 extracted.json/part-00000-ccb4b4ee-c83a-4a9b-94ce-9dd0ac462620-c000.json

{"week":"2021-01-04","departure_zipcode":"85001","arrival_zipcode":"85002","cost":"$28.32","vendor":"ABC Hauling"}
