In [4]:
from concurrent.futures import ThreadPoolExecutor
import time
import threading

def task(n):
    # print("Thread ID:", threading.get_ident())
    time.sleep(1)
    return f"Task {n} done"

with ThreadPoolExecutor(max_workers=3) as executor:
    results = executor.map(task, [1, 2, 3, 4, 5])

for r in results:
    print(r)

Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done


In [3]:

from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * n

with ThreadPoolExecutor(max_workers=2) as executor:
    future1 = executor.submit(task, 5)
    future2 = executor.submit(task, 10)

    print(future1.result())
    print(future2.result())

25
100


In [7]:
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

# Worker function
def sum_chunk(chunk, chunk_id):
    print(f"[START] Thread {threading.current_thread().name} processing chunk {chunk_id}: {chunk}")
    chunk_sum = sum(chunk)
    print(f"[END]   Thread {threading.current_thread().name} finished chunk {chunk_id} -> sum = {chunk_sum}")
    return chunk_sum


# Step 1: Generate numbers 1 to 100
numbers = list(range(1, 101))
print("Generated numbers:", numbers)

# Step 2: Split into 10 chunks
chunk_size = 10
chunks = [numbers[i:i + chunk_size] for i in range(0, len(numbers), chunk_size)]

print("\nChunks created:")
for i, chunk in enumerate(chunks):
    print(f"Chunk {i}: {chunk}")

# Step 3: Use ThreadPoolExecutor
partial_sums = []

print("\nSubmitting tasks to ThreadPoolExecutor...\n")

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = []

    for idx, chunk in enumerate(chunks):
        future = executor.submit(sum_chunk, chunk, idx)
        futures.append(future)

    # Step 4: Collect results as tasks complete
    for future in as_completed(futures):
        result = future.result()
        print(f"[COLLECT] Received partial sum: {result}")
        partial_sums.append(result)

# Step 5: Final sum
final_sum = sum(partial_sums)

print("\nAll partial sums:", partial_sums)
print("FINAL SUM:", final_sum)

Generated numbers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

Chunks created:
Chunk 0: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Chunk 1: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
Chunk 2: [21, 22, 23, 24, 25, 26, 27, 28, 29, 30]
Chunk 3: [31, 32, 33, 34, 35, 36, 37, 38, 39, 40]
Chunk 4: [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]
Chunk 5: [51, 52, 53, 54, 55, 56, 57, 58, 59, 60]
Chunk 6: [61, 62, 63, 64, 65, 66, 67, 68, 69, 70]
Chunk 7: [71, 72, 73, 74, 75, 76, 77, 78, 79, 80]
Chunk 8: [81, 82, 83, 84, 85, 86, 87, 88, 89, 90]
Chunk 9: [91, 92, 93, 94, 95, 96, 97, 98, 99, 100]

Submitting tasks to ThreadPoolExecutor...

[START] Thread ThreadPoolExecutor-6

In [None]:
# yarn jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
# wordcount \
# -D mapreduce.job.queuename=default \
# -D mapreduce.map.memory.mb=1256 \
# -D mapreduce.reduce.memory.mb=1256 \
# -D mapreduce.map.java.opts="-Xmx1200m" \
# -D mapreduce.reduce.java.opts="-Xmx1200m" \
# -D mapreduce.map.cpu.vcores=1 \
# -D mapreduce.reduce.cpu.vcores=1 \
# /user/hadoop/input-jps \
# /user/hadoop/output_wc-jps

<!-- CREATE EXTERNAL TABLE IF NOT EXISTS movies_raw (
  movieId INT,
  title STRING,
  genres STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
  "separatorChar" = ",",
  "quoteChar"     = "\"",
  "escapeChar"    = "\\"
)
STORED AS TEXTFILE
LOCATION '/bronze-jps/movies'; -->

hdfs dfs -ls /user/hadoop/output_wc-jps
hdfs dfs -cat /user/hadoop/output_wc-jps/part-r-00001