Skip to content

Latest commit

 

History

History
340 lines (281 loc) · 7.86 KB

File metadata and controls

340 lines (281 loc) · 7.86 KB

Avro

Introduction

Avro is a row-oriented remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols and serializes data in a compact binary format. Its primary use is in Apache Hadoop, where it can provide both a serialization format for persistent data, and a wire format for communication between Hadoop nodes, and from client programs to the Hadoop services. Avro uses a schema to structure the data that is being encoded. It has two schema languages; one for human editing (Avro IDL) and another more machine-readable based on JSON.

How to Produce/Consume a message

{% tabs %} {% tab title="Node.js" %} Memphis abstracts the need for external serialization functions and embeds them within the SDK.

In node.js, we can simply produce an object. Behind the scenes, the object will be serialized based on the attached schema and data format.

Example schema:

{% code lineNumbers="true" %}

{
    "type": "record",
    "namespace": "com.example",
    "name": "contact_details",
    "fields": [
        { "name": "username", "type": "string" },
        { "name": "age", "type": "int" }
    ]
}

{% endcode %}

Code:

{% code lineNumbers="true" %}

const memphis = require("memphis-dev");

(async function () {
    try {
        await memphis.connect({
            host: "MEMPHIS_BROKER_URL",
            username: "APPLICATION_USER",
            password: "PASSWORD",
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });
        const producer = await memphis.producer({
            stationName: "STATION_NAME",
            producerName: "PRODUCER_NAME"
        });
        var payload = {
            username: "Daniel Craig", 
            age: 36
        };
        try {
            await producer.produce({
                message: payload
        });
        } catch (ex) {
            console.log(ex.message)
        }
    } catch (ex) {
        console.log(ex);
        memphis.close();
    }
})();

{% endcode %} {% endtab %}

{% tab title="Go" %} Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

{% code lineNumbers="true" %}

{
    "type": "record",
    "namespace": "com.example",
    "name": "contact_details",
    "fields": [
        { "name": "username", "type": "string" },
        { "name": "age", "type": "double" }
    ]
}

{% endcode %}

Code:

{% code lineNumbers="true" %}

package main

import (
    "fmt"
    "os"
    "github.com/memphisdev/memphis.go"
)

func main() {
    conn, err := memphis.Connect(
        "MEMPHIS_BROKER_URL", 
        "APPLICATION_TYPE_USERNAME", 
        memphis.Password("PASSWORD"),
        // memphis.AccountId(123456789), //*optional* In case you are using Memphis.dev cloud
    )
    if err != nil {
        os.Exit(1)
    }
    defer conn.Close()
    p, err := conn.CreateProducer("STATION_NAME", "PRODUCER_NAME")

    hdrs := memphis.Headers{}
    hdrs.New()
    err = hdrs.Add("key", "value")

    if err != nil {
        fmt.Printf("Header failed: %v\n", err)
        os.Exit(1)
    }
    type msgStruct struct {
        Username string `avro:"username"`
        Age      int    `avro:"age"`
    }
    msg := msgStruct{
        Username: "Daniel Craig",
        Age:      36,
    }

    err = p.Produce(msg, memphis.MsgHeaders(hdrs))

    if err != nil {
        fmt.Printf("Produce failed: %v\n", err)
        os.Exit(1)
    }
}

{% endcode %} {% endtab %}

{% tab title="Python" %} Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

{% code lineNumbers="true" %}

{
    "type": "record",
    "namespace": "com.example",
    "name": "contact_details",
    "fields": [
        { "name": "username", "type": "string" },
        { "name": "age", "type": "int" }
    ]
}

{% endcode %}

Code:

{% code lineNumbers="true" %}

import asyncio
import json
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError

async def main():
    memphis = Memphis()
    await memphis.connect(host="MEMPHIS_HOST", username="MEMPHIS_USERNAME", password="PASSWORD", account_id=ACCOUNT_ID)
    producer = await memphis.producer(
        station_name="STATION_NAME", producer_name="PRODUCER_NAME")

    headers = Headers()
    headers.add("key", "value")

    msg = {'username': 'Daniel Craig', 'age': 36}

    try:
        await producer.produce(message=msg, headers=headers)

    except Exception as e:
        print(e)
    finally:
        await asyncio.sleep(3)

    await memphis.close()

if __name__ == '__main__':
    asyncio.run(main())

{% endcode %} {% endtab %}

{% tab title="TypeScript" %} Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

{% code lineNumbers="true" %}

{
    "type": "record",
    "namespace": "com.example",
    "name": "contact_details",
    "fields": [
        { "name": "username", "type": "string" },
        { "name": "age", "type": "int" }
    ]
}

{% endcode %}

Code:

{% code lineNumbers="true" %}

import memphis from 'memphis-dev';
import type { Memphis } from 'memphis-dev/types';

(async function () {
    let memphisConnection: Memphis;

    try {
        memphisConnection = await memphis.connect({
            host: 'MEMPHIS_BROKER_URL',
            username: 'APPLICATION_TYPE_USERNAME',
            password: 'PASSWORD',
            // accountId: ACCOUNT_ID //*optional* In case you are using Memphis.dev cloud
        });

        const producer = await memphisConnection.producer({
            stationName: 'STATION_NAME',
            producerName: 'PRODUCER_NAME'
        });

        const headers = memphis.headers()
        headers.add('key', 'value');
        var msg = {
            username: "Daniel Craig", 
            age: 36
        };
        await producer.produce({
            message: msg,
            headers: headers
        });

        memphisConnection.close();
    } catch (ex) {
        console.log(ex);
    }
})();

{% endcode %} {% endtab %}

{% tab title=".NET" %} Memphis abstracts the need for external serialization functions and embeds them within the SDK.

Example schema:

{% code lineNumbers="true" %}

{
    "type": "record",
    "namespace": "com.example",
    "name": "contact_details",
    "fields": [
        { "name": "username", "type": "string" },
        { "name": "age", "type": "int" }
    ]
}

{% endcode %}

Code:

{% code overflow="wrap" lineNumbers="true" %}

using Memphis.Client;
using Memphis.Client.Producer;

using System.Runtime.Serialization;
using System.Collections.Specialized;

var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<username>";
options.ConnectionToken = "<broker-token>";
/**
* In case you are using Memphis.dev cloud
* options.AccountId = "<account-id>";
*/

try
{
    var client = await MemphisClientFactory.CreateClient(options);

    var producer = await client.CreateProducer(new MemphisProducerOptions
    {
        StationName = "<memphis-station-name>",
        ProducerName = "<memphis-producer-name>",
        GenerateUniqueSuffix = true
    });

    NameValueCollection commonHeaders = new()
    {
        {
            "key-1", "value-1"
        }
    };

    ContactDetail contactDetail = new()
    {
        Username = "John Doe",
        Age = 20
    };

    await producer.ProduceAsync(contactDetail, commonHeaders);
    client.Dispose();
}
catch (Exception exception)
{
    Console.WriteLine($"Error occurred: {exception.Message}");
}


public class ContactDetail
{
    [DataMember(Name = "username")]
	public string Username { get; set; }

    [DataMember(Name = "age")]
	public int Age { get; set; }
}

{% endcode %} {% endtab %}

{% endtabs %}