Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
  • Loading branch information
videlalvaro committed Apr 2, 2011
0 parents commit 4e492b0
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 0 deletions.
41 changes: 41 additions & 0 deletions RabbitMQUtils/MSMQ.Util.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
module MSMQ.Util

open System.Messaging

type Receiver(queue:MessageQueue) =
let mutable disposed = false
let mutable currKeepConsuming = true

let cleanup() =
if not disposed then
disposed <- true
queue.Dispose()

interface System.IDisposable with
member x.Dispose() = cleanup()

member this.keepConsuming
with get() = currKeepConsuming
and set (v:bool) = currKeepConsuming <- v

member this.startConsuming (queue:MessageQueue) (formatter:IMessageFormatter) handler =
queue.Formatter <- new BinaryMessageFormatter()
queue.ReceiveCompleted.AddHandler(new ReceiveCompletedEventHandler(handler))
queue.BeginReceive() |> ignore
while this.keepConsuming do ()

//gets MSMQ queue by name
let queue name =
if MessageQueue.Exists(name) then new MessageQueue(name)
else MessageQueue.Create(name)

//MSMQ Messages
let publishMsg (q:MessageQueue) (msg:Message) tag =
q.Send(msg)

//consumes from MSMQ and calls the RMQ publisher
let startConsuming (queue:MessageQueue) (formatter:IMessageFormatter) handler =
queue.Formatter <- new BinaryMessageFormatter()
queue.ReceiveCompleted.AddHandler(new ReceiveCompletedEventHandler(handler))
queue.BeginReceive() |> ignore
while true do ()
30 changes: 30 additions & 0 deletions RabbitMQUtils/RabbitMQ.Bridge.Bridge.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module RabbitMQ.Bridge.Bridge

open System.Messaging
open RabbitMQ.Client

open MSMQ.Util
open RabbitMQ.Helper

let onMessage (rmqPublisher:Publisher) (receiver:Receiver)(source:obj) (asyncResult:ReceiveCompletedEventArgs) =
try
let queue = source :?> MessageQueue
let msg = queue.EndReceive(asyncResult.AsyncResult)
let body = msg.Body.ToString()
if body = "quit" then
receiver.keepConsuming <- false
else
rmqPublisher.PublishMsg body
queue.BeginReceive()
|> ignore
with
| err -> printfn "Error: %s" err.Message

let msmq2rmq connOptions (exchange, rKey, basicProperties) msQueue = do
let f = factory connOptions
use conn = f.CreateConnection()
use chann = conn.CreateModel()
use publisher = new Publisher(chann, exchange, rKey, basicProperties)
use q = MSMQ.Util.queue msQueue
use receiver = new Receiver(q)
receiver.startConsuming q (new BinaryMessageFormatter()) (onMessage publisher receiver)
60 changes: 60 additions & 0 deletions RabbitMQUtils/RabbitMQ.Bridge.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProductVersion>8.0.30703</ProductVersion>
<SchemaVersion>2.0</SchemaVersion>
<ProjectGuid>{802f2e56-222b-4fd0-9f41-035c31d83234}</ProjectGuid>
<OutputType>Library</OutputType>
<RootNamespace>RabbitMQUtils</RootNamespace>
<AssemblyName>RabbitMQUtils</AssemblyName>
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
<Name>RabbitMQ.Bridge</Name>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<Tailcalls>false</Tailcalls>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<WarningLevel>3</WarningLevel>
<DocumentationFile>bin\Debug\RabbitMQUtils.XML</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<Tailcalls>true</Tailcalls>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<WarningLevel>3</WarningLevel>
<DocumentationFile>bin\Release\RabbitMQUtils.XML</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<Reference Include="mscorlib" />
<Reference Include="FSharp.Core" />
<Reference Include="RabbitMQ.Client">
<HintPath>C:\Program Files\RabbitMQ\DotNetClient\bin\RabbitMQ.Client.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Messaging" />
<Reference Include="System.Numerics" />
</ItemGroup>
<ItemGroup>
<Compile Include="MSMQ.Util.fs" />
<None Include="Script.fsx" />
<Compile Include="RabbitMQ.Helper.fs" />
<Compile Include="RabbitMQ.Bridge.Bridge.fs" />
</ItemGroup>
<Import Project="$(MSBuildExtensionsPath32)\FSharp\1.0\Microsoft.FSharp.Targets" Condition="!Exists('$(MSBuildBinPath)\Microsoft.Build.Tasks.v4.0.dll')" />
<Import Project="$(MSBuildExtensionsPath32)\..\Microsoft F#\v4.0\Microsoft.FSharp.Targets" Condition=" Exists('$(MSBuildBinPath)\Microsoft.Build.Tasks.v4.0.dll')" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>
32 changes: 32 additions & 0 deletions RabbitMQUtils/RabbitMQ.Helper.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module RabbitMQ.Helper

open RabbitMQ.Client

/// gets a ConnectionFactory
let factory (user, pass, vhost, hostname, port, protocol) : ConnectionFactory =
let f = new ConnectionFactory()
f.UserName <- user
f.Password <- pass
f.VirtualHost <- vhost
f.Protocol <- protocol
f.HostName <- hostname
f.Port <- port
f

///Publish MSMQ messages to exchange using rKey
type Publisher(chann:IModel, exchange:string, rKey:string, basicProperties:IBasicProperties) =
let mutable disposed = false

let cleanup() =
if not disposed then
disposed <- true;
chann.Dispose();

interface System.IDisposable with
member x.Dispose() = cleanup()

member this.CloseAll() = cleanup()

member this.PublishMsg (body:string) =
let content = System.Text.Encoding.UTF8.GetBytes(body)
chann.BasicPublish(exchange, rKey, basicProperties, content)
7 changes: 7 additions & 0 deletions RabbitMQUtils/Script.fsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// This file is a script that can be executed with the F# Interactive.
// It can be used to explore and test the library project.
// Note that script files will not be part of the project build.

#load "Module1.fs"
open Module1

0 comments on commit 4e492b0

Please sign in to comment.