# Working with Apache Hudi Deltastreamer

Working with Apache Hudi Deltastreamer
HoodieDeltaStreamer utility is part of hudi-utilities-bundle that provides a way to ingest data from sources such as DFS or Kafka.

In this notebook, you will learn to use DeltaStreamer Utility to bulk insert data into a Hudi Dataset as a Copy on Write(CoW) table and perform batch upsert. 

We will run queries in hudi-cli and SparkSQL to verify the tables and subsequent updates are incorporated into our datalake on Amazon S3

Let's get started !

## Generate Data

### Install Python Faker 

In [1]:
!pip install Faker

Collecting Faker
  Downloading Faker-9.2.0-py3-none-any.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 23.7 MB/s eta 0:00:01
Collecting text-unidecode==1.3
  Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
[K     |████████████████████████████████| 78 kB 16.0 MB/s eta 0:00:01
Installing collected packages: text-unidecode, Faker
Successfully installed Faker-9.2.0 text-unidecode-1.3
You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


### Fake Profile Generator

Fake profile generator uses Python's Faker [https://faker.readthedocs.io/en/master/index.html] library. Let's define a method to generate a number of random person profiles.

In [2]:
import os
import json
import random
import boto3
import io
from io import StringIO
from faker import Faker
from faker.providers import date_time, credit_card
from json import dumps


# Intialize Faker library and S3 client
fake = Faker() 
fake.add_provider(date_time)
fake.add_provider(credit_card)

s3 = boto3.resource('s3')

# Write the fake profile data to a S3 bucket
# Replace with your own bucket
s3_bucket = "vasveena-test-demo"
s3_load_prefix = 'hudi-ds/inputdata/'
s3_update_prefix = 'hudi-ds/updates/'

# Number of records in each file and number of files
# Adjust per your need - this produces 40MB files
#num_records = 150000
#num_files = 50

num_records = 10000
num_files = 15

def generate_bulk_data():
    '''
    Generates bulk profile data
    '''
    # Generate number of files equivalent to num_files
    for i in range (num_files):
        fake_profile_data = fake_profile_generator(num_records, fake)
        fakeIO = StringIO()
        filename = 'profile_' + str(i + 1) + '.json'
        s3key = s3_load_prefix + filename 

        fakeIO.write(str(''.join(dumps_lines(fake_profile_data))))

        s3object = s3.Object(s3_bucket, s3key)
        s3object.put(Body=(bytes(fakeIO.getvalue().encode('UTF-8'))))
        fakeIO.close()

def generate_updates():
    '''
    Generates updates for the profiles
    '''
    #
    # We will make updates to records in randomly picked files
    #
    random_file_list = []
    
    for i in range (1, num_files):
        random_file_list.append('profile_' + str(i) + '.json')
    
    for f in random_file_list:
        print(f)
        s3key = s3_load_prefix + f
        obj = s3.Object(s3_bucket, s3key)
        profile_data = obj.get()['Body'].read().decode('utf-8')
        
        #s3_profile_list = json.loads(profile_data)
        stringIO_data = io.StringIO(profile_data)
        data = stringIO_data.readlines()

        #Its time to use json module now.
        json_data = list(map(json.loads, data))

        fakeIO = StringIO()
        s3key = s3_update_prefix + f
        fake_profile_data = []
        
        for rec in json_data:
            # Let's generate a new address
            print ("old address: " + rec['street_address'])
            rec['street_address'] = fake.address()
            print ("new address: " + rec['street_address'])
            fake_profile_data.append(rec)
            
        fakeIO.write(str(''.join(dumps_lines(fake_profile_data))))
        s3object = s3.Object(s3_bucket, s3key)
        s3object.put(Body=(bytes(fakeIO.getvalue().encode('UTF-8'))))
        fakeIO.close()

def fake_profile_generator(length, fake, new_address=""):
    """
    Generates fake profiles
    """
    for x in range (length):       
        yield {'Name': fake.name(),
               'phone': fake.phone_number(),
               'job': fake.job(),
               'company': fake.company(),
               'ssn': fake.ssn(),
               'street_address': (new_address if new_address else fake.address()),
               'dob': (fake.date_of_birth(tzinfo=None, minimum_age=21, maximum_age=105).isoformat()),
               'email': fake.email(),
               'ts': (fake.date_time_between(start_date='-10y', end_date='now', tzinfo=None).isoformat()),
               'credit_card': fake.credit_card_number(),
               'record_id': fake.pyint(),
               'id': fake.uuid4()}
        
def dumps_lines(objs):
    for obj in objs:
        yield json.dumps(obj, separators=(',',':')) + '\n'   

### Start the data generator

Following code kicks off the fake data generator to produce files each with certain records (configurable) in JSON format. The files are written to a specified S3 bucket.

In [3]:
generate_bulk_data()

## Copy Hudi Libraries on the EMR Cluster and create Hive table

0. For the following steps to work, you should have launched the EMR cluster with appropriate permissions set for **Systems Manager Session Manager** 
1. From the AWS Console, type SSM in the search box and navigate to the **Amazon System Manager console**
2. On the left hand side, select **Session Manager** from **Instances and Nodes** section
3. Click on the start session and you should see two EC2 instances listed 
4. Select instance-id of the **EMR's Master** Node and click on **Start session**
5. From the terminal type the following to change to user *ec2-user*
 
```bash
sh-4.2$ sudo su hadoop
hadoop@ip-10-0-2-73 /]$ cd
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-utilities-bundle.jar /apps/hudi/lib/hudi-utilities-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/jars/httpclient-4.5.9.jar /apps/hudi/lib/httpclient-4.5.9.jar
hdfs dfs -copyFromLocal /usr/lib/spark/jars/httpcore-4.4.11.jar /apps/hudi/lib/httpcore-4.4.11.jar
hdfs dfs -ls /apps/hudi/lib/
Found 5 items
-rw-r--r--   1 hadoop hadoop     774384 2021-10-11 02:51 /apps/hudi/lib/httpclient-4.5.9.jar
-rw-r--r--   1 hadoop hadoop     326874 2021-10-11 02:51 /apps/hudi/lib/httpcore-4.4.11.jar
-rw-r--r--   1 hadoop hadoop   35041795 2021-10-11 02:51 /apps/hudi/lib/hudi-spark-bundle.jar
-rw-r--r--   1 hadoop hadoop   39996793 2021-10-11 02:51 /apps/hudi/lib/hudi-utilities-bundle.jar
-rw-r--r--   1 hadoop hadoop     161984 2021-10-11 02:51 /apps/hudi/lib/spark-avro.jar
```

## Run DeltaStreamer to write a Copy on Write (COW) table

We will now run the DeltaStreamer utility as an EMR Step to write the above JSON formatted data into a Hudi dataset. To do that, we will need the following:

* Properties file on localfs or dfs, with configurations for Hudi client, schema provider, key generator and data source 
* Schema file for source dataset
* Schema file for target dataset

To run DeltaStreamer

```
! ~/.local/bin/aws emr add-steps --cluster-id j-1GMG9EJ4Z4ZL0 --steps Type=Spark,Name="Deltastreamer COW - Bulk Insert",ActionOnFailure=CONTINUE,Args=[--jars,hdfs:///apps/hudi/lib/*.jar,--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://my-bucket/hudi-ds/config/json-deltastreamer.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://my-bucket/hudi-ds-output/person-profile-out1,--target-table,person_profile_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,BULK_INSERT] --region us-east-1

```


Replace the following values in the above command in the text editor

1. --cluster-id with the value you got from previous step
2. For --props value replace xxxx part in hudi-workshop-xxxx with the S3 bucket name 
3. For -- target-base-path value with the S3 bucket name
4. After replacing the values, copy the entire commmand and run it in the next cell
5. If the values are replaced correctly, you should see a step id displayed as the output



In [18]:
!pip install awscli --upgrade --user

Requirement already up-to-date: awscli in ./.local/lib/python3.7/site-packages (1.20.58)
You should consider upgrading via the '/opt/conda/bin/python3.7 -m pip install --upgrade pip' command.[0m


In [25]:
! ls ~/.local/bin/aws

/home/jovyan/.local/bin/aws


In [29]:
! ~/.local/bin/aws emr add-steps --cluster-id j-1GMG9EJ4Z4ZL0 --steps Type=Spark,Name="Deltastreamer COW - Bulk Insert",ActionOnFailure=CONTINUE,Args=[--jars,hdfs:///apps/hudi/lib/*.jar,--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://vasveena-test-demo/hudi-ds/config/json-deltastreamer.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://vasveena-test-demo/hudi-ds-output/person-profile-out1,--target-table,person_profile_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,BULK_INSERT] --region us-east-1

{
    "StepIds": [
        "s-22UV4POBZ7NZQ"
    ]
}


## Query the Hudi Dataset

Now let us check the S3 path:

```
$ aws s3 ls s3://<my bucket>/hudi-ds-output/person-profile-out1/
                           PRE .hoodie/
2021-10-11 02:55:31          0 .hoodie_$folder$
2021-10-11 02:55:53         93 .hoodie_partition_metadata
2021-10-11 02:55:55    2121824 37d44679-8b9a-4f53-864d-d3efe81b538b-0_2-4-34_20211011025540.parquet
2021-10-11 02:55:55    1955015 4fefd569-0e95-4457-b6cb-ef49a96549f1-0_7-4-39_20211011025540.parquet
2021-10-11 02:55:56    2132101 50d1f972-f25c-47ee-a381-28a2d600b028-0_8-4-40_20211011025540.parquet
2021-10-11 02:55:56    2186257 574f9dcb-1d46-4357-8d1f-8c5145cf6351-0_0-4-32_20211011025540.parquet
2021-10-11 02:55:55    1805544 b26e465b-2238-4ba7-a603-86105b42e5c9-0_6-4-38_20211011025540.parquet
2021-10-11 02:55:56    2479839 b7b62846-fe0d-4836-9146-4a2c412c4fa9-0_3-4-35_20211011025540.parquet
2021-10-11 02:55:56    2353476 d68bb0f1-4c72-45ec-8cd0-77470f8b2826-0_4-4-36_20211011025540.parquet
2021-10-11 02:55:56    2540438 ebbc234b-d953-4857-8f22-ff792bd919ed-0_9-4-41_20211011025540.parquet
2021-10-11 02:55:56    2984486 f49805bb-ccd0-446d-b1bc-36a12bb9a1fe-0_1-4-33_20211011025540.parquet
2021-10-11 02:55:55    2261358 f8e3b197-eb7a-4a83-8d42-2e68757ff199-0_5-4-37_20211011025540.parquet
```

To query the Hudi dataset you can do one of the following

- Navigate to the another sparkmagic notebook and run queries in Spark using SparkMagic cell
- SSH to the master node (you can also SSM if you launched your cluster with SSM permissions) and run queries using Hive/Presto
- Head to the Hue console on Amazon EMR and run queries
- Query using Amazon Athena or Redshift spectrum (preferred)

Let us use Athena to query

```

In Athena console: 

CREATE EXTERNAL TABLE `profile_cow`(
  `_hoodie_commit_time` string, 
  `_hoodie_commit_seqno` string, 
  `_hoodie_record_key` string, 
  `_hoodie_partition_path` string, 
  `_hoodie_file_name` string, 
  `Name` string, 
  `phone` string, 
  `job` string, 
  `company` string, 
  `ssn` string, 
  `street_address` string, 
  `dob` string, 
  `email` string, 
  `ts` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://my-bucket/hudi-ds-output/person-profile-out1';

select * from profile_cow limit 2;

	_hoodie_commit_time	_hoodie_commit_seqno	_hoodie_record_key	_hoodie_partition_path	_hoodie_file_name	name	phone	job	company	ssn	street_address	dob	email	ts
1	20211011025540	20211011025540_2_1	3b288239-a6cf-4312-8044-cd984ddd1066		37d44679-8b9a-4f53-864d-d3efe81b538b-0_2-4-34_20211011025540.parquet	David Foley	001-798-549-5064	Clinical psychologist	Hays LLC	711-86-5759	941 Charles Centers New Christina, WY 22122	1942-07-27	hwalsh@example.com	2012-02-22T04:08:36
2	20211011025540	20211011025540_2_2	3b28bd5c-d5fc-4694-837f-58fb62289775		37d44679-8b9a-4f53-864d-d3efe81b538b-0_2-4-34_20211011025540.parquet	Eric Woods	001-655-150-8537x4698	Archaeologist	Henry Ltd	394-48-5755	93277 Laurie Trail Suite 421 Crawfordmouth, WY 86413	1954-11-23	grayjennifer@example.net	2017-08-27T00:06:25

Now, lets make a note of street_address in one of these two records -> "941 Charles Centers New Christina, WY 22122"

select _hoodie_commit_time, street_address from profile_cow where _hoodie_record_key='3b288239-a6cf-4312-8044-cd984ddd1066';

_hoodie_commit_time	street_address
1	20211011025540	941 Charles Centers New Christina, WY 22122  

```

Lets now run an upsert to observe the change in records


## Run updates

In [None]:
generate_updates()

Check the records in updates/ location.

```
$ $ aws s3 ls s3://vasveena-test-demo/hudi-ds/updates/
2021-10-11 03:56:03    3686203 profile_1.json
2021-10-11 03:56:09    3688069 profile_2.json
2021-10-11 03:56:14    3683899 profile_3.json
2021-10-11 03:56:19    3684258 profile_4.json

```

## Run DeltaStreamer to apply updates

We will now run the Deltastreamer again to run upserts using the updates generated in the previous step.

```

! ~/.local/bin/aws emr add-steps --cluster-id j-XXXXXXX --steps Type=Spark,Name="Deltastreamer Profile Upserts",ActionOnFailure=CONTINUE,Args=[--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://<my-bucket>/hudi-ds/config/json-deltastreamer.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://<my-bucket>/hudi-ds/output/profile-test15-out,--target-table,profile_test15_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,UPSERT] --region us-east-1

```

In [35]:
! ~/.local/bin/aws emr add-steps --cluster-id j-1GMG9EJ4Z4ZL0 --steps Type=Spark,Name="Deltastreamer COW",ActionOnFailure=CONTINUE,Args=[--jars,hdfs:///apps/hudi/lib/*.jar,--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://vasveena-test-demo/config/json-deltastreamer_upsert.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://vasveena-test-demo/hudi-ds-output/person-profile-out1,--target-table,person_profile_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,UPSERT] --region us-east-1

{
    "StepIds": [
        "s-110EI3UO5BVNR"
    ]
}


## Query the updated Hudi Dataset

Now lets check the S3 path of output location. Notice the new Parquet files. 

```

$ aws s3 ls s3://<my-bucket>/hudi-ds-output/person-profile-out1/
                           PRE .hoodie/
2021-10-11 02:55:31          0 .hoodie_$folder$
2021-10-11 02:55:53         93 .hoodie_partition_metadata
2021-10-11 03:58:38    2124844 37d44679-8b9a-4f53-864d-d3efe81b538b-0_2-22-130_20211011035814.parquet
2021-10-11 02:55:55    2121824 37d44679-8b9a-4f53-864d-d3efe81b538b-0_2-4-34_20211011025540.parquet
2021-10-11 02:55:55    1955015 4fefd569-0e95-4457-b6cb-ef49a96549f1-0_7-4-39_20211011025540.parquet
2021-10-11 03:58:38    1958284 4fefd569-0e95-4457-b6cb-ef49a96549f1-0_9-22-137_20211011035814.parquet
2021-10-11 03:58:38    2135767 50d1f972-f25c-47ee-a381-28a2d600b028-0_0-22-128_20211011035814.parquet
2021-10-11 02:55:56    2132101 50d1f972-f25c-47ee-a381-28a2d600b028-0_8-4-40_20211011025540.parquet
2021-10-11 02:55:56    2186257 574f9dcb-1d46-4357-8d1f-8c5145cf6351-0_0-4-32_20211011025540.parquet
2021-10-11 03:58:38    2189874 574f9dcb-1d46-4357-8d1f-8c5145cf6351-0_7-22-135_20211011035814.parquet
2021-10-11 03:58:39    1808741 b26e465b-2238-4ba7-a603-86105b42e5c9-0_1-22-129_20211011035814.parquet
2021-10-11 02:55:55    1805544 b26e465b-2238-4ba7-a603-86105b42e5c9-0_6-4-38_20211011025540.parquet
2021-10-11 02:55:56    2479839 b7b62846-fe0d-4836-9146-4a2c412c4fa9-0_3-4-35_20211011025540.parquet
2021-10-11 03:58:39    2484653 b7b62846-fe0d-4836-9146-4a2c412c4fa9-0_4-22-132_20211011035814.parquet
2021-10-11 02:55:56    2353476 d68bb0f1-4c72-45ec-8cd0-77470f8b2826-0_4-4-36_20211011025540.parquet
2021-10-11 03:58:39    2358152 d68bb0f1-4c72-45ec-8cd0-77470f8b2826-0_6-22-134_20211011035814.parquet
2021-10-11 03:58:39    2544868 ebbc234b-d953-4857-8f22-ff792bd919ed-0_8-22-136_20211011035814.parquet
2021-10-11 02:55:56    2540438 ebbc234b-d953-4857-8f22-ff792bd919ed-0_9-4-41_20211011025540.parquet
2021-10-11 02:55:56    2984486 f49805bb-ccd0-446d-b1bc-36a12bb9a1fe-0_1-4-33_20211011025540.parquet
2021-10-11 03:58:40    2989781 f49805bb-ccd0-446d-b1bc-36a12bb9a1fe-0_3-22-131_20211011035814.parquet
2021-10-11 03:58:39    2265328 f8e3b197-eb7a-4a83-8d42-2e68757ff199-0_5-22-133_20211011035814.parquet
2021-10-11 02:55:55    2261358 f8e3b197-eb7a-4a83-8d42-2e68757ff199-0_5-4-37_20211011025540.parquet

```

Let's query an upserted record. 

```
select _hoodie_commit_time, street_address from profile_cow where _hoodie_record_key='3b288239-a6cf-4312-8044-cd984ddd1066';

20211011025540    941 Charles Centers New Christina, WY 22122               # Old address 
20211011035814	35740 Young Orchard Suite 147 South Williamport, MT 82610   # Our recent update 


```

Now lets check out Hudi CLI

```
connect --path s3://<my-bucket>/hudi-ds-output/person-profile-out1/
hudi:person_profile_cow->commits show
2021-10-11 04:01:17,704 INFO timeline.HoodieActiveTimeline: Loaded instants [[20211011025540__commit__COMPLETED], [20211011035814__commit__COMPLETED]]
2021-10-11 04:01:17,728 INFO s3n.S3NativeFileSystem: Opening 's3://vasveena-test-demo/hudi-ds-output/person-profile-out1/.hoodie/20211011035814.commit' for reading
2021-10-11 04:01:18,044 INFO s3n.S3NativeFileSystem: Opening 's3://vasveena-test-demo/hudi-ds-output/person-profile-out1/.hoodie/20211011025540.commit' for reading
╔════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime     │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20211011035814 │ 21.8 MB             │ 0                 │ 10                  │ 1                        │ 150000                │ 140000                       │ 0            ║
╟────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20211011025540 │ 21.8 MB             │ 10                │ 0                   │ 1                        │ 150000                │ 0                            │ 0            ║
╚════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

```