Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An example of big data: word count #108

Open
jianzs opened this issue Dec 27, 2023 · 0 comments
Open

An example of big data: word count #108

jianzs opened this issue Dec 27, 2023 · 0 comments

Comments

@jianzs
Copy link
Contributor

jianzs commented Dec 27, 2023

import { MD5 } from 'crypto-js';
import { Bucket, Function } from "@plutolang/pluto";

const inputBucket = new Bucket("articles", { 
  exists: true // Don't try to create this bucket when deploying this application.
});
const intermediateBucket = new Bucket("intermediates");
const outputBucket = new Bucket("words", { 
  destroy: false // Don't destroy this bucket when destroying this application.
});

const mapperNum = 10;
const reducerNum = 5;

async function mapFn(mapperId: number) {
  const intermediateOutputs: { word: string, count: number }[][] = [];
  for (let i = 0; i < reducerNum; i++) {
    intermediateOutputs.push([]);
  }

  const articleNum = await inputBucket.list();
  for (let i = mapperId; i < articleNum; i += mapperId) {
    let article = await inputBucket.get(`articles/${i}`);
    processOneArticle(article);
  }

  for (let i = 0; i < reducerNum; i++) {
    await intermediateBucket.set(`intermediates/${i}/${mapperId}`, JSON.stringify(intermediateOutputs[i]));
  }

  function processOneArticle(article: string) {
    // Replace all characters that aren't a letter or a hyphen with a space
    let processedArticle = article.replace(/[^a-zA-Z0-9\-]/g, ' ');

    // Collapse all whitespace
    processedArticle = processedArticle.replace(/s+/g, ' ');

    // Split the line by space and iterate over each word
    processedArticle.split(' ').forEach(function (word) {
      const hash = MD5(word).toString();
      const partition = parseInt(hash.substring(0, 8), 16) % reducerNum;
      // Emit the word and a 1
      intermediateOutputs[partition].push({ word: word, count: 1 });
    });
  }
}

async function reduceFn(reducerId: number) {
  // Read the intermediate outputs from each mapper and count the number of times each word appears.
  // The word counts are stored in a dictionary, where the key is the word and the value is the count.
  const wordCounts: { [word: string]: number } = {};
  for (let mapperId = 0; mapperId < mapperNum; mapperId++) {
    const intermediateOutput = JSON.parse(await intermediateBucket.get(`intermediates/${reducerId}/${mapperId}`));
    intermediateOutput.forEach(function (wordCount: { word: string, count: number }) {
      wordCounts[wordCount.word] = (wordCounts[wordCount.word] ?? 0) + wordCount.count;
    })
  }

  // Convert the word counts to an array of objects, and save them to output bucket.
  // The output bucket will be a list of objects, each object contains a word and its count.
  const outputs: { word: string, count: number }[] = [];
  for (const word in wordCounts) {
    outputs.push({ word: word, count: wordCounts[word] });
  }
  await outputBucket.set(`words/${reducerId}`, JSON.stringify(outputs));
}

// Sort the word counts by count and take the top k words
async function topK(k: number) {
  const words: { word: string, count: number }[] = [];
  for (let reducerId = 0; reducerId < reducerNum; reducerId++) {
    const output = JSON.parse(await outputBucket.get(`words/${reducerId}`));
    words.push(...output);
  }
  const sortedWords = words.sort((a, b) => b.count - a.count);
  return sortedWords.slice(0, k);
}

async function main() {
  // Create a list of mappers, and invoke them in parallel
  const mappers = []
  const mapFnResource = new Function(mapFn); // Should be OK to create a resource object here.
  for (let i = 0; i < mapperNum; i++) {
    const mapperId = i;  // Deep copy
    mappers.push(mapFnResource.invoke(mapperId));
    
    // Invalid case.
    // Create the resource object using the data generated at runtime.
    // const fn = new Function(async () => { await mapFn(mapperId); }); 
    // mapper.push(fn.invoke());
  }
  // Wait for all mappers to finish
  await Promise.all(mappers);

  // Create a list of reducers, and invoke them in parallel
  const reducers = [];
  const reduceFnResource = new Function(reduceFn);
  for (let i = 0; i < reducerNum; i++) {
    const reducerId = i; // Deep copy
    reducers.push(reduceFnResource.invoke(reducerId));
  }
  // Wait for all reducers to finish
  await Promise.all(reducers);

  // List the top 10 words based on their frequency in the result
  const topKFnResource = new Function(topK);
  const topKWords = await topKFnResource.invoke(10);
  console.log(topKWords);
}

main();

Expected Behavior

During compilation, deduce from the code that the entire application contains 4 lambda and 2 bucket resource instances, and generate a pure IaC (Infrastructure as Code) code to create these resource instances. Among them, the 4 lambda resource instances correspond to four sections of code: mapper, reducer, topk, and the main code. The main code excludes Infra API-related code.

After deployment, the lambda resource instance corresponding to the main code is automatically triggered to execute, and the logs of the 4 lambda resource instances are output in chronological order. The output ends when the execution process of this resource instance is completed.

During execution in the cloud, the various resource objects that are created and invoked are Client class objects of the resource types. For example, the return values of new Function and new Bucket in the code are client class instances of Function and Bucket, respectively.

Additional Information

If, aside from the scope that includes Infra API statements, all other code does not involve Client API calls, it indicates that the main code, apart from the Infra API's Handler and resource instances, is unrelated to cloud resources and can be executed locally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant